tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hj...@apache.org
Subject [24/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project structure.
Date Wed, 03 Dec 2014 05:30:39 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
deleted file mode 100644
index d143e58..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
+++ /dev/null
@@ -1,445 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.filter.*;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.ColumnStats;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.storage.Scanner;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.BytesUtils;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class HBaseScanner implements Scanner {
-  private static final Log LOG = LogFactory.getLog(HBaseScanner.class);
-  private static final int DEFAULT_FETCH_SIZE = 1000;
-  private static final int MAX_LIST_SIZE = 100;
-
-  protected boolean inited = false;
-  private TajoConf conf;
-  private Schema schema;
-  private TableMeta meta;
-  private HBaseFragment fragment;
-  private Scan scan;
-  private HTableInterface htable;
-  private Configuration hbaseConf;
-  private Column[] targets;
-  private TableStats tableStats;
-  private ResultScanner scanner;
-  private AtomicBoolean finished = new AtomicBoolean(false);
-  private float progress = 0.0f;
-  private int scanFetchSize;
-  private Result[] scanResults;
-  private int scanResultIndex = -1;
-  private Column[] schemaColumns;
-
-  private ColumnMapping columnMapping;
-  private int[] targetIndexes;
-
-  private int numRows = 0;
-
-  private byte[][][] mappingColumnFamilies;
-  private boolean[] isRowKeyMappings;
-  private boolean[] isBinaryColumns;
-  private boolean[] isColumnKeys;
-  private boolean[] isColumnValues;
-
-  private int[] rowKeyFieldIndexes;
-  private char rowKeyDelimiter;
-
-  public HBaseScanner (Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException {
-    this.conf = (TajoConf)conf;
-    this.schema = schema;
-    this.meta = meta;
-    this.fragment = (HBaseFragment)fragment;
-    this.tableStats = new TableStats();
-  }
-
-  @Override
-  public void init() throws IOException {
-    inited = true;
-    schemaColumns = schema.toArray();
-    if (fragment != null) {
-      tableStats.setNumBytes(0);
-      tableStats.setNumBlocks(1);
-    }
-    if (schema != null) {
-      for(Column eachColumn: schema.getColumns()) {
-        ColumnStats columnStats = new ColumnStats(eachColumn);
-        tableStats.addColumnStat(columnStats);
-      }
-    }
-
-    scanFetchSize = Integer.parseInt(meta.getOption(HBaseStorageConstants.META_FETCH_ROWNUM_KEY, "" + DEFAULT_FETCH_SIZE));
-    if (targets == null) {
-      targets = schema.toArray();
-    }
-
-    columnMapping = new ColumnMapping(schema, meta);
-    targetIndexes = new int[targets.length];
-    int index = 0;
-    for (Column eachTargetColumn: targets) {
-      targetIndexes[index++] = schema.getColumnId(eachTargetColumn.getQualifiedName());
-    }
-
-    mappingColumnFamilies = columnMapping.getMappingColumns();
-    isRowKeyMappings = columnMapping.getIsRowKeyMappings();
-    isBinaryColumns = columnMapping.getIsBinaryColumns();
-    isColumnKeys = columnMapping.getIsColumnKeys();
-    isColumnValues = columnMapping.getIsColumnValues();
-
-    rowKeyDelimiter = columnMapping.getRowKeyDelimiter();
-    rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes();
-
-    hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, meta);
-
-    initScanner();
-  }
-
-  private void initScanner() throws IOException {
-    scan = new Scan();
-    scan.setBatch(scanFetchSize);
-    scan.setCacheBlocks(false);
-    scan.setCaching(scanFetchSize);
-
-    FilterList filters = null;
-    if (targetIndexes == null || targetIndexes.length == 0) {
-      filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
-      filters.addFilter(new FirstKeyOnlyFilter());
-      filters.addFilter(new KeyOnlyFilter());
-    } else {
-      boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings();
-      for (int eachIndex : targetIndexes) {
-        if (isRowKeyMappings[eachIndex]) {
-          continue;
-        }
-        byte[][] mappingColumn = columnMapping.getMappingColumns()[eachIndex];
-        if (mappingColumn[1] == null) {
-          scan.addFamily(mappingColumn[0]);
-        } else {
-          scan.addColumn(mappingColumn[0], mappingColumn[1]);
-        }
-      }
-    }
-
-    scan.setStartRow(fragment.getStartRow());
-    if (fragment.isLast() && fragment.getStopRow() != null &&
-        fragment.getStopRow().length > 0) {
-      // last and stopRow is not empty
-      if (filters == null) {
-        filters = new FilterList();
-      }
-      filters.addFilter(new InclusiveStopFilter(fragment.getStopRow()));
-    } else {
-      scan.setStopRow(fragment.getStopRow());
-    }
-
-    if (filters != null) {
-      scan.setFilter(filters);
-    }
-
-    if (htable == null) {
-      HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE))
-          .getConnection(hbaseConf);
-      htable = hconn.getTable(fragment.getHbaseTableName());
-    }
-    scanner = htable.getScanner(scan);
-  }
-
-  @Override
-  public Tuple next() throws IOException {
-    if (finished.get()) {
-      return null;
-    }
-
-    if (scanResults == null || scanResultIndex >= scanResults.length) {
-      scanResults = scanner.next(scanFetchSize);
-      if (scanResults == null || scanResults.length == 0) {
-        finished.set(true);
-        progress = 1.0f;
-        return null;
-      }
-      scanResultIndex = 0;
-    }
-
-    Result result = scanResults[scanResultIndex++];
-    Tuple resultTuple = new VTuple(schema.size());
-    for (int i = 0; i < targetIndexes.length; i++) {
-      resultTuple.put(targetIndexes[i], getDatum(result, targetIndexes[i]));
-    }
-    numRows++;
-    return resultTuple;
-  }
-
-  private Datum getDatum(Result result, int fieldId) throws IOException {
-    byte[] value = null;
-    if (isRowKeyMappings[fieldId]) {
-      value = result.getRow();
-      if (!isBinaryColumns[fieldId] && rowKeyFieldIndexes[fieldId] >= 0) {
-        int rowKeyFieldIndex = rowKeyFieldIndexes[fieldId];
-
-        byte[][] rowKeyFields = BytesUtils.splitPreserveAllTokens(value, rowKeyDelimiter);
-
-        if (rowKeyFields.length < rowKeyFieldIndex) {
-          return NullDatum.get();
-        } else {
-          value = rowKeyFields[rowKeyFieldIndex];
-        }
-      }
-    } else {
-      if (isColumnKeys[fieldId]) {
-        NavigableMap<byte[], byte[]> cfMap = result.getFamilyMap(mappingColumnFamilies[fieldId][0]);
-        if (cfMap != null) {
-          Set<byte[]> keySet = cfMap.keySet();
-          if (keySet.size() == 1) {
-            try {
-              return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], keySet.iterator().next());
-            } catch (Exception e) {
-              LOG.error(e.getMessage(), e);
-              throw new RuntimeException(e.getMessage(), e);
-            }
-          } else {
-            StringBuilder sb = new StringBuilder();
-            sb.append("[");
-            int count = 0;
-            for (byte[] eachKey : keySet) {
-              if (count > 0) {
-                sb.append(", ");
-              }
-              Datum datum = HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], eachKey);
-              sb.append("\"").append(datum.asChars()).append("\"");
-              count++;
-              if (count > MAX_LIST_SIZE) {
-                break;
-              }
-            }
-            sb.append("]");
-            return new TextDatum(sb.toString());
-          }
-        }
-      } else if (isColumnValues[fieldId]) {
-        NavigableMap<byte[], byte[]> cfMap = result.getFamilyMap(mappingColumnFamilies[fieldId][0]);
-        if (cfMap != null) {
-          Collection<byte[]> valueList = cfMap.values();
-          if (valueList.size() == 1) {
-            try {
-              return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], valueList.iterator().next());
-            } catch (Exception e) {
-              LOG.error(e.getMessage(), e);
-              throw new RuntimeException(e.getMessage(), e);
-            }
-          } else {
-            StringBuilder sb = new StringBuilder();
-            sb.append("[");
-            int count = 0;
-            for (byte[] eachValue : valueList) {
-              if (count > 0) {
-                sb.append(", ");
-              }
-              Datum datum = HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], eachValue);
-              sb.append("\"").append(datum.asChars()).append("\"");
-              count++;
-              if (count > MAX_LIST_SIZE) {
-                break;
-              }
-            }
-            sb.append("]");
-            return new TextDatum(sb.toString());
-          }
-        }
-      } else {
-        if (mappingColumnFamilies[fieldId][1] == null) {
-          NavigableMap<byte[], byte[]> cfMap = result.getFamilyMap(mappingColumnFamilies[fieldId][0]);
-          if (cfMap != null && !cfMap.isEmpty()) {
-            int count = 0;
-            String delim = "";
-
-            if (cfMap.size() == 0) {
-              return NullDatum.get();
-            } else if (cfMap.size() == 1) {
-              // If a column family is mapped without column name like "cf1:" and the number of cells is one,
-              // return value is flat format not json format.
-              NavigableMap.Entry<byte[], byte[]> entry = cfMap.entrySet().iterator().next();
-              byte[] entryKey = entry.getKey();
-              byte[] entryValue = entry.getValue();
-              if (entryKey == null || entryKey.length == 0) {
-                try {
-                  if (isBinaryColumns[fieldId]) {
-                    return HBaseBinarySerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue);
-                  } else {
-                    return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue);
-                  }
-                } catch (Exception e) {
-                  LOG.error(e.getMessage(), e);
-                  throw new RuntimeException(e.getMessage(), e);
-                }
-              }
-            }
-            StringBuilder sb = new StringBuilder();
-            sb.append("{");
-            for (NavigableMap.Entry<byte[], byte[]> entry : cfMap.entrySet()) {
-              byte[] entryKey = entry.getKey();
-              byte[] entryValue = entry.getValue();
-
-              String keyText = new String(entryKey);
-              String valueText = null;
-              if (entryValue != null) {
-                try {
-                  if (isBinaryColumns[fieldId]) {
-                    valueText = HBaseBinarySerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue).asChars();
-                  } else {
-                    valueText = HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue).asChars();
-                  }
-                } catch (Exception e) {
-                  LOG.error(e.getMessage(), e);
-                  throw new RuntimeException(e.getMessage(), e);
-                }
-              }
-              sb.append(delim).append("\"").append(keyText).append("\":\"").append(valueText).append("\"");
-              delim = ", ";
-              count++;
-              if (count > MAX_LIST_SIZE) {
-                break;
-              }
-            } //end of for
-            sb.append("}");
-            return new TextDatum(sb.toString());
-          } else {
-            value = null;
-          }
-        } else {
-          value = result.getValue(mappingColumnFamilies[fieldId][0], mappingColumnFamilies[fieldId][1]);
-        }
-      }
-    }
-
-    if (value == null) {
-      return NullDatum.get();
-    } else {
-      try {
-        if (isBinaryColumns[fieldId]) {
-          return HBaseBinarySerializerDeserializer.deserialize(schemaColumns[fieldId], value);
-        } else {
-          return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], value);
-        }
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
-        throw new RuntimeException(e.getMessage(), e);
-      }
-    }
-  }
-
-  @Override
-  public void reset() throws IOException {
-    progress = 0.0f;
-    scanResultIndex = -1;
-    scanResults = null;
-    finished.set(false);
-    tableStats = new TableStats();
-
-    if (scanner != null) {
-      scanner.close();
-      scanner = null;
-    }
-
-    initScanner();
-  }
-
-  @Override
-  public void close() throws IOException {
-    progress = 1.0f;
-    finished.set(true);
-    if (scanner != null) {
-      try {
-        scanner.close();
-        scanner = null;
-      } catch (Exception e) {
-        LOG.warn("Error while closing hbase scanner: " + e.getMessage(), e);
-      }
-    }
-    if (htable != null) {
-      htable.close();
-      htable = null;
-    }
-  }
-
-  @Override
-  public boolean isProjectable() {
-    return true;
-  }
-
-  @Override
-  public void setTarget(Column[] targets) {
-    if (inited) {
-      throw new IllegalStateException("Should be called before init()");
-    }
-    this.targets = targets;
-  }
-
-  @Override
-  public boolean isSelectable() {
-    return false;
-  }
-
-  @Override
-  public void setSearchCondition(Object expr) {
-    // TODO implements adding column filter to scanner.
-  }
-
-  @Override
-  public boolean isSplittable() {
-    return true;
-  }
-
-  @Override
-  public float getProgress() {
-    return progress;
-  }
-
-  @Override
-  public TableStats getInputStats() {
-    tableStats.setNumRows(numRows);
-    return tableStats;
-  }
-
-  @Override
-  public Schema getSchema() {
-    return schema;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java
deleted file mode 100644
index 2c525a1..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-public interface HBaseStorageConstants {
-  public static final String KEY_COLUMN_MAPPING = "key";
-  public static final String VALUE_COLUMN_MAPPING = "value";
-  public static final String META_FETCH_ROWNUM_KEY = "fetch.rownum";
-  public static final String META_TABLE_KEY = "table";
-  public static final String META_COLUMNS_KEY = "columns";
-  public static final String META_SPLIT_ROW_KEYS_KEY = "hbase.split.rowkeys";
-  public static final String META_SPLIT_ROW_KEYS_FILE_KEY = "hbase.split.rowkeys.file";
-  public static final String META_ZK_QUORUM_KEY = "hbase.zookeeper.quorum";
-  public static final String META_ROWKEY_DELIMITER = "hbase.rowkey.delimiter";
-
-  public static final String INSERT_PUT_MODE = "tajo.hbase.insert.put.mode";
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
deleted file mode 100644
index b47b98c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
+++ /dev/null
@@ -1,1126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.tajo.*;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.expr.*;
-import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.plan.rewrite.RewriteRule;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.*;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.*;
-
-/**
- * StorageManager for HBase table.
- */
-public class HBaseStorageManager extends StorageManager {
-  private static final Log LOG = LogFactory.getLog(HBaseStorageManager.class);
-
-  private Map<HConnectionKey, HConnection> connMap = new HashMap<HConnectionKey, HConnection>();
-
-  public HBaseStorageManager (StoreType storeType) {
-    super(storeType);
-  }
-
-  @Override
-  public void storageInit() throws IOException {
-  }
-
-  @Override
-  public void closeStorageManager() {
-    synchronized (connMap) {
-      for (HConnection eachConn: connMap.values()) {
-        try {
-          eachConn.close();
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-        }
-      }
-    }
-  }
-
-  @Override
-  public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException {
-    createTable(tableDesc.getMeta(), tableDesc.getSchema(), tableDesc.isExternal(), ifNotExists);
-    TableStats stats = new TableStats();
-    stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
-    tableDesc.setStats(stats);
-  }
-
-  private void createTable(TableMeta tableMeta, Schema schema,
-                           boolean isExternal, boolean ifNotExists) throws IOException {
-    String hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY, "");
-    if (hbaseTableName == null || hbaseTableName.trim().isEmpty()) {
-      throw new IOException("HBase mapped table is required a '" +
-          HBaseStorageConstants.META_TABLE_KEY + "' attribute.");
-    }
-    TableName hTableName = TableName.valueOf(hbaseTableName);
-
-    String mappedColumns = tableMeta.getOption(HBaseStorageConstants.META_COLUMNS_KEY, "");
-    if (mappedColumns != null && mappedColumns.split(",").length > schema.size()) {
-      throw new IOException("Columns property has more entry than Tajo table columns");
-    }
-
-    ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta);
-    int numRowKeys = 0;
-    boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings();
-    for (int i = 0; i < isRowKeyMappings.length; i++) {
-      if (isRowKeyMappings[i]) {
-        numRowKeys++;
-      }
-    }
-    if (numRowKeys > 1) {
-      for (int i = 0; i < isRowKeyMappings.length; i++) {
-        if (isRowKeyMappings[i] && schema.getColumn(i).getDataType().getType() != Type.TEXT) {
-          throw new IOException("Key field type should be TEXT type.");
-        }
-      }
-    }
-
-    for (int i = 0; i < isRowKeyMappings.length; i++) {
-      if (columnMapping.getIsColumnKeys()[i] && schema.getColumn(i).getDataType().getType() != Type.TEXT) {
-        throw new IOException("Column key field('<cfname>:key:') type should be TEXT type.");
-      }
-      if (columnMapping.getIsColumnValues()[i] && schema.getColumn(i).getDataType().getType() != Type.TEXT) {
-        throw new IOException("Column value field(('<cfname>:value:') type should be TEXT type.");
-      }
-    }
-
-    Configuration hConf = getHBaseConfiguration(conf, tableMeta);
-    HBaseAdmin hAdmin =  new HBaseAdmin(hConf);
-
-    try {
-      if (isExternal) {
-        // If tajo table is external table, only check validation.
-        if (mappedColumns == null || mappedColumns.isEmpty()) {
-          throw new IOException("HBase mapped table is required a '" +
-              HBaseStorageConstants.META_COLUMNS_KEY + "' attribute.");
-        }
-        if (!hAdmin.tableExists(hTableName)) {
-          throw new IOException("HBase table [" + hbaseTableName + "] not exists. " +
-              "External table should be a existed table.");
-        }
-        HTableDescriptor hTableDescriptor = hAdmin.getTableDescriptor(hTableName);
-        Set<String> tableColumnFamilies = new HashSet<String>();
-        for (HColumnDescriptor eachColumn : hTableDescriptor.getColumnFamilies()) {
-          tableColumnFamilies.add(eachColumn.getNameAsString());
-        }
-
-        Collection<String> mappingColumnFamilies =columnMapping.getColumnFamilyNames();
-        if (mappingColumnFamilies.isEmpty()) {
-          throw new IOException("HBase mapped table is required a '" +
-              HBaseStorageConstants.META_COLUMNS_KEY + "' attribute.");
-        }
-
-        for (String eachMappingColumnFamily : mappingColumnFamilies) {
-          if (!tableColumnFamilies.contains(eachMappingColumnFamily)) {
-            throw new IOException("There is no " + eachMappingColumnFamily + " column family in " + hbaseTableName);
-          }
-        }
-      } else {
-        if (hAdmin.tableExists(hbaseTableName)) {
-          if (ifNotExists) {
-            return;
-          } else {
-            throw new IOException("HBase table [" + hbaseTableName + "] already exists.");
-          }
-        }
-        // Creating hbase table
-        HTableDescriptor hTableDescriptor = parseHTableDescriptor(tableMeta, schema);
-
-        byte[][] splitKeys = getSplitKeys(conf, schema, tableMeta);
-        if (splitKeys == null) {
-          hAdmin.createTable(hTableDescriptor);
-        } else {
-          hAdmin.createTable(hTableDescriptor, splitKeys);
-        }
-      }
-    } finally {
-      hAdmin.close();
-    }
-  }
-
-  /**
-   * Returns initial region split keys.
-   *
-   * @param conf
-   * @param schema
-   * @param meta
-   * @return
-   * @throws IOException
-   */
-  private byte[][] getSplitKeys(TajoConf conf, Schema schema, TableMeta meta) throws IOException {
-    String splitRowKeys = meta.getOption(HBaseStorageConstants.META_SPLIT_ROW_KEYS_KEY, "");
-    String splitRowKeysFile = meta.getOption(HBaseStorageConstants.META_SPLIT_ROW_KEYS_FILE_KEY, "");
-
-    if ((splitRowKeys == null || splitRowKeys.isEmpty()) &&
-        (splitRowKeysFile == null || splitRowKeysFile.isEmpty())) {
-      return null;
-    }
-
-    ColumnMapping columnMapping = new ColumnMapping(schema, meta);
-    boolean[] isBinaryColumns = columnMapping.getIsBinaryColumns();
-    boolean[] isRowKeys = columnMapping.getIsRowKeyMappings();
-
-    boolean rowkeyBinary = false;
-    int numRowKeys = 0;
-    Column rowKeyColumn = null;
-    for (int i = 0; i < isBinaryColumns.length; i++) {
-      if (isBinaryColumns[i] && isRowKeys[i]) {
-        rowkeyBinary = true;
-      }
-      if (isRowKeys[i]) {
-        numRowKeys++;
-        rowKeyColumn = schema.getColumn(i);
-      }
-    }
-
-    if (rowkeyBinary && numRowKeys > 1) {
-      throw new IOException("If rowkey is mapped to multi column and a rowkey is binary, " +
-          "Multiple region for creation is not support.");
-    }
-
-    if (splitRowKeys != null && !splitRowKeys.isEmpty()) {
-      String[] splitKeyTokens = splitRowKeys.split(",");
-      byte[][] splitKeys = new byte[splitKeyTokens.length][];
-      for (int i = 0; i < splitKeyTokens.length; i++) {
-        if (numRowKeys == 1 && rowkeyBinary) {
-          splitKeys[i] = HBaseBinarySerializerDeserializer.serialize(rowKeyColumn, new TextDatum(splitKeyTokens[i]));
-        } else {
-          splitKeys[i] = HBaseTextSerializerDeserializer.serialize(rowKeyColumn, new TextDatum(splitKeyTokens[i]));
-        }
-      }
-      return splitKeys;
-    }
-
-    if (splitRowKeysFile != null && !splitRowKeysFile.isEmpty()) {
-      // If there is many split keys, Tajo allows to define in the file.
-      Path path = new Path(splitRowKeysFile);
-      FileSystem fs = path.getFileSystem(conf);
-      if (!fs.exists(path)) {
-        throw new IOException("hbase.split.rowkeys.file=" + path.toString() + " not exists.");
-      }
-
-      SortedSet<String> splitKeySet = new TreeSet<String>();
-      BufferedReader reader = null;
-      try {
-        reader = new BufferedReader(new InputStreamReader(fs.open(path)));
-        String line = null;
-        while ( (line = reader.readLine()) != null ) {
-          if (line.isEmpty()) {
-            continue;
-          }
-          splitKeySet.add(line);
-        }
-      } finally {
-        if (reader != null) {
-          reader.close();
-        }
-      }
-
-      if (splitKeySet.isEmpty()) {
-        return null;
-      }
-
-      byte[][] splitKeys = new byte[splitKeySet.size()][];
-      int index = 0;
-      for (String eachKey: splitKeySet) {
-        if (numRowKeys == 1 && rowkeyBinary) {
-          splitKeys[index++] = HBaseBinarySerializerDeserializer.serialize(rowKeyColumn, new TextDatum(eachKey));
-        } else {
-          splitKeys[index++] = HBaseTextSerializerDeserializer.serialize(rowKeyColumn, new TextDatum(eachKey));
-        }
-      }
-
-      return splitKeys;
-    }
-
-    return null;
-  }
-
-  /**
-   * Creates Configuration instance and sets with hbase connection options.
-   *
-   * @param conf
-   * @param tableMeta
-   * @return
-   * @throws IOException
-   */
-  public static Configuration getHBaseConfiguration(Configuration conf, TableMeta tableMeta) throws IOException {
-    String zkQuorum = tableMeta.getOption(HBaseStorageConstants.META_ZK_QUORUM_KEY, "");
-    if (zkQuorum == null || zkQuorum.trim().isEmpty()) {
-      throw new IOException("HBase mapped table is required a '" +
-          HBaseStorageConstants.META_ZK_QUORUM_KEY + "' attribute.");
-    }
-
-    Configuration hbaseConf = (conf == null) ? HBaseConfiguration.create() : HBaseConfiguration.create(conf);
-    hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
-
-    for (Map.Entry<String, String> eachOption: tableMeta.getOptions().getAllKeyValus().entrySet()) {
-      String key = eachOption.getKey();
-      if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
-        hbaseConf.set(key, eachOption.getValue());
-      }
-    }
-    return hbaseConf;
-  }
-
-  /**
-   * Creates HTableDescription using table meta data.
-   *
-   * @param tableMeta
-   * @param schema
-   * @return
-   * @throws IOException
-   */
-  public static HTableDescriptor parseHTableDescriptor(TableMeta tableMeta, Schema schema) throws IOException {
-    String hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY, "");
-    if (hbaseTableName == null || hbaseTableName.trim().isEmpty()) {
-      throw new IOException("HBase mapped table is required a '" +
-          HBaseStorageConstants.META_TABLE_KEY + "' attribute.");
-    }
-    TableName hTableName = TableName.valueOf(hbaseTableName);
-
-    ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta);
-
-    HTableDescriptor hTableDescriptor = new HTableDescriptor(hTableName);
-
-    Collection<String> columnFamilies = columnMapping.getColumnFamilyNames();
-    //If 'columns' attribute is empty, Tajo table columns are mapped to all HBase table column.
-    if (columnFamilies.isEmpty()) {
-      for (Column eachColumn: schema.getColumns()) {
-        columnFamilies.add(eachColumn.getSimpleName());
-      }
-    }
-
-    for (String eachColumnFamily: columnFamilies) {
-      hTableDescriptor.addFamily(new HColumnDescriptor(eachColumnFamily));
-    }
-
-    return hTableDescriptor;
-  }
-
-  @Override
-  public void purgeTable(TableDesc tableDesc) throws IOException {
-    HBaseAdmin hAdmin =  new HBaseAdmin(getHBaseConfiguration(conf, tableDesc.getMeta()));
-
-    try {
-      HTableDescriptor hTableDesc = parseHTableDescriptor(tableDesc.getMeta(), tableDesc.getSchema());
-      LOG.info("Deleting hbase table: " + new String(hTableDesc.getName()));
-      hAdmin.disableTable(hTableDesc.getName());
-      hAdmin.deleteTable(hTableDesc.getName());
-    } finally {
-      hAdmin.close();
-    }
-  }
-
-  /**
-   * Returns columns which are mapped to the rowkey of the hbase table.
-   *
-   * @param tableDesc
-   * @return
-   * @throws IOException
-   */
-  private Column[] getIndexableColumns(TableDesc tableDesc) throws IOException {
-    ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta());
-    boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings();
-    int[] rowKeyIndexes = columnMapping.getRowKeyFieldIndexes();
-
-    Column indexColumn = null;
-    for (int i = 0; i < isRowKeyMappings.length; i++) {
-      if (isRowKeyMappings[i]) {
-        if (columnMapping.getNumRowKeys() == 1 ||
-            rowKeyIndexes[i] == 0) {
-          indexColumn = tableDesc.getSchema().getColumn(i);
-        }
-      }
-    }
-    return new Column[]{indexColumn};
-  }
-
-  @Override
-  public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) throws IOException {
-    ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta());
-
-    List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, scanNode);
-    Configuration hconf = getHBaseConfiguration(conf, tableDesc.getMeta());
-    HTable htable = null;
-    HBaseAdmin hAdmin = null;
-
-    try {
-      htable = new HTable(hconf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY));
-
-      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
-      if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
-        HRegionLocation regLoc = htable.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
-        if (null == regLoc) {
-          throw new IOException("Expecting at least one region.");
-        }
-        List<Fragment> fragments = new ArrayList<Fragment>(1);
-        Fragment fragment = new HBaseFragment(fragmentId, htable.getName().getNameAsString(),
-            HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostname());
-        fragments.add(fragment);
-        return fragments;
-      }
-
-      List<byte[]> startRows;
-      List<byte[]> stopRows;
-
-      if (indexPredications != null && !indexPredications.isEmpty()) {
-        // indexPredications is Disjunctive set
-        startRows = new ArrayList<byte[]>();
-        stopRows = new ArrayList<byte[]>();
-        for (IndexPredication indexPredication: indexPredications) {
-          byte[] startRow;
-          byte[] stopRow;
-          if (indexPredication.getStartValue() != null) {
-            startRow = serialize(columnMapping, indexPredication, indexPredication.getStartValue());
-          } else {
-            startRow = HConstants.EMPTY_START_ROW;
-          }
-          if (indexPredication.getStopValue() != null) {
-            stopRow = serialize(columnMapping, indexPredication, indexPredication.getStopValue());
-          } else {
-            stopRow = HConstants.EMPTY_END_ROW;
-          }
-          startRows.add(startRow);
-          stopRows.add(stopRow);
-        }
-      } else {
-        startRows = TUtil.newList(HConstants.EMPTY_START_ROW);
-        stopRows = TUtil.newList(HConstants.EMPTY_END_ROW);
-      }
-
-      hAdmin =  new HBaseAdmin(hconf);
-      Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>();
-
-      // region startkey -> HBaseFragment
-      Map<byte[], HBaseFragment> fragmentMap = new HashMap<byte[], HBaseFragment>();
-      for (int i = 0; i < keys.getFirst().length; i++) {
-        HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false);
-
-        byte[] regionStartKey = keys.getFirst()[i];
-        byte[] regionStopKey = keys.getSecond()[i];
-
-        int startRowsSize = startRows.size();
-        for (int j = 0; j < startRowsSize; j++) {
-          byte[] startRow = startRows.get(j);
-          byte[] stopRow = stopRows.get(j);
-          // determine if the given start an stop key fall into the region
-          if ((startRow.length == 0 || regionStopKey.length == 0 || Bytes.compareTo(startRow, regionStopKey) < 0)
-              && (stopRow.length == 0 || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
-            byte[] fragmentStart = (startRow.length == 0 || Bytes.compareTo(regionStartKey, startRow) >= 0) ?
-                regionStartKey : startRow;
-
-            byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) &&
-                regionStopKey.length > 0 ? regionStopKey : stopRow;
-
-            String regionName = location.getRegionInfo().getRegionNameAsString();
-
-            ServerLoad serverLoad = serverLoadMap.get(location.getServerName());
-            if (serverLoad == null) {
-              serverLoad = hAdmin.getClusterStatus().getLoad(location.getServerName());
-              serverLoadMap.put(location.getServerName(), serverLoad);
-            }
-
-            if (fragmentMap.containsKey(regionStartKey)) {
-              HBaseFragment prevFragment = fragmentMap.get(regionStartKey);
-              if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) {
-                prevFragment.setStartRow(fragmentStart);
-              }
-              if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) {
-                prevFragment.setStopRow(fragmentStop);
-              }
-            } else {
-              HBaseFragment fragment = new HBaseFragment(fragmentId, htable.getName().getNameAsString(),
-                  fragmentStart, fragmentStop, location.getHostname());
-
-              // get region size
-              boolean foundLength = false;
-              for (Map.Entry<byte[], RegionLoad> entry : serverLoad.getRegionsLoad().entrySet()) {
-                if (regionName.equals(Bytes.toString(entry.getKey()))) {
-                  RegionLoad regionLoad = entry.getValue();
-                  long storeFileSize = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * 1024L * 1024L;
-                  fragment.setLength(storeFileSize);
-                  foundLength = true;
-                  break;
-                }
-              }
-
-              if (!foundLength) {
-                fragment.setLength(TajoConstants.UNKNOWN_LENGTH);
-              }
-
-              fragmentMap.put(regionStartKey, fragment);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("getFragments: fragment -> " + i + " -> " + fragment);
-              }
-            }
-          }
-        }
-      }
-
-      List<HBaseFragment> fragments = new ArrayList<HBaseFragment>(fragmentMap.values());
-      Collections.sort(fragments);
-      if (!fragments.isEmpty()) {
-        fragments.get(fragments.size() - 1).setLast(true);
-      }
-      return (ArrayList<Fragment>) (ArrayList) fragments;
-    } finally {
-      if (htable != null) {
-        htable.close();
-      }
-      if (hAdmin != null) {
-        hAdmin.close();
-      }
-    }
-  }
-
-  private byte[] serialize(ColumnMapping columnMapping,
-                           IndexPredication indexPredication, Datum datum) throws IOException {
-    if (columnMapping.getIsBinaryColumns()[indexPredication.getColumnId()]) {
-      return HBaseBinarySerializerDeserializer.serialize(indexPredication.getColumn(), datum);
-    } else {
-      return HBaseTextSerializerDeserializer.serialize(indexPredication.getColumn(), datum);
-    }
-  }
-
-  @Override
-  public Appender getAppender(OverridableConf queryContext,
-                              QueryUnitAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
-      throws IOException {
-    if ("true".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) {
-      return new HBasePutAppender(conf, taskAttemptId, schema, meta, workDir);
-    } else {
-      return super.getAppender(queryContext, taskAttemptId, meta, schema, workDir);
-    }
-  }
-
-  @Override
-  public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments)
-      throws IOException {
-    Configuration hconf = getHBaseConfiguration(conf, tableDesc.getMeta());
-    HTable htable = null;
-    HBaseAdmin hAdmin = null;
-    try {
-      htable = new HTable(hconf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY));
-
-      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
-      if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
-        return new ArrayList<Fragment>(1);
-      }
-      hAdmin =  new HBaseAdmin(hconf);
-      Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>();
-
-      List<Fragment> fragments = new ArrayList<Fragment>(keys.getFirst().length);
-
-      int start = currentPage * numFragments;
-      if (start >= keys.getFirst().length) {
-        return new ArrayList<Fragment>(1);
-      }
-      int end = (currentPage + 1) * numFragments;
-      if (end > keys.getFirst().length) {
-        end = keys.getFirst().length;
-      }
-      for (int i = start; i < end; i++) {
-        HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false);
-
-        String regionName = location.getRegionInfo().getRegionNameAsString();
-        ServerLoad serverLoad = serverLoadMap.get(location.getServerName());
-        if (serverLoad == null) {
-          serverLoad = hAdmin.getClusterStatus().getLoad(location.getServerName());
-          serverLoadMap.put(location.getServerName(), serverLoad);
-        }
-
-        HBaseFragment fragment = new HBaseFragment(tableDesc.getName(), htable.getName().getNameAsString(),
-            location.getRegionInfo().getStartKey(), location.getRegionInfo().getEndKey(), location.getHostname());
-
-        // get region size
-        boolean foundLength = false;
-        for (Map.Entry<byte[], RegionLoad> entry : serverLoad.getRegionsLoad().entrySet()) {
-          if (regionName.equals(Bytes.toString(entry.getKey()))) {
-            RegionLoad regionLoad = entry.getValue();
-            long storeLength = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * 1024L * 1024L;
-            if (storeLength == 0) {
-              // If store size is smaller than 1 MB, storeLength is zero
-              storeLength = 1 * 1024 * 1024;  //default 1MB
-            }
-            fragment.setLength(storeLength);
-            foundLength = true;
-            break;
-          }
-        }
-
-        if (!foundLength) {
-          fragment.setLength(TajoConstants.UNKNOWN_LENGTH);
-        }
-
-        fragments.add(fragment);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("getFragments: fragment -> " + i + " -> " + fragment);
-        }
-      }
-
-      if (!fragments.isEmpty()) {
-        ((HBaseFragment) fragments.get(fragments.size() - 1)).setLast(true);
-      }
-      return fragments;
-    } finally {
-      if (htable != null) {
-        htable.close();
-      }
-      if (hAdmin != null) {
-        hAdmin.close();
-      }
-    }
-  }
-
-  public HConnection getConnection(Configuration hbaseConf) throws IOException {
-    synchronized(connMap) {
-      HConnectionKey key = new HConnectionKey(hbaseConf);
-      HConnection conn = connMap.get(key);
-      if (conn == null) {
-        conn = HConnectionManager.createConnection(hbaseConf);
-        connMap.put(key, conn);
-      }
-
-      return conn;
-    }
-  }
-
-  static class HConnectionKey {
-    final static String[] CONNECTION_PROPERTIES = new String[] {
-        HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT,
-        HConstants.ZOOKEEPER_CLIENT_PORT,
-        HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME,
-        HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-        HConstants.HBASE_RPC_TIMEOUT_KEY,
-        HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
-        HConstants.HBASE_META_SCANNER_CACHING,
-        HConstants.HBASE_CLIENT_INSTANCE_ID,
-        HConstants.RPC_CODEC_CONF_KEY };
-
-    private Map<String, String> properties;
-    private String username;
-
-    HConnectionKey(Configuration conf) {
-      Map<String, String> m = new HashMap<String, String>();
-      if (conf != null) {
-        for (String property : CONNECTION_PROPERTIES) {
-          String value = conf.get(property);
-          if (value != null) {
-            m.put(property, value);
-          }
-        }
-      }
-      this.properties = Collections.unmodifiableMap(m);
-
-      try {
-        UserProvider provider = UserProvider.instantiate(conf);
-        User currentUser = provider.getCurrent();
-        if (currentUser != null) {
-          username = currentUser.getName();
-        }
-      } catch (IOException ioe) {
-        LOG.warn("Error obtaining current user, skipping username in HConnectionKey", ioe);
-      }
-    }
-
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      if (username != null) {
-        result = username.hashCode();
-      }
-      for (String property : CONNECTION_PROPERTIES) {
-        String value = properties.get(property);
-        if (value != null) {
-          result = prime * result + value.hashCode();
-        }
-      }
-
-      return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj)
-        return true;
-      if (obj == null)
-        return false;
-      if (getClass() != obj.getClass())
-        return false;
-      HConnectionKey that = (HConnectionKey) obj;
-      if (this.username != null && !this.username.equals(that.username)) {
-        return false;
-      } else if (this.username == null && that.username != null) {
-        return false;
-      }
-      if (this.properties == null) {
-        if (that.properties != null) {
-          return false;
-        }
-      } else {
-        if (that.properties == null) {
-          return false;
-        }
-        for (String property : CONNECTION_PROPERTIES) {
-          String thisValue = this.properties.get(property);
-          String thatValue = that.properties.get(property);
-          //noinspection StringEquality
-          if (thisValue == thatValue) {
-            continue;
-          }
-          if (thisValue == null || !thisValue.equals(thatValue)) {
-            return false;
-          }
-        }
-      }
-      return true;
-    }
-
-    @Override
-    public String toString() {
-      return "HConnectionKey{" +
-          "properties=" + properties +
-          ", username='" + username + '\'' +
-          '}';
-    }
-  }
-
-  public List<IndexPredication> getIndexPredications(ColumnMapping columnMapping,
-                                                     TableDesc tableDesc, ScanNode scanNode) throws IOException {
-    List<IndexPredication> indexPredications = new ArrayList<IndexPredication>();
-    Column[] indexableColumns = getIndexableColumns(tableDesc);
-    if (indexableColumns != null && indexableColumns.length == 1) {
-      // Currently supports only single index column.
-      List<Set<EvalNode>> indexablePredicateList = findIndexablePredicateSet(scanNode, indexableColumns);
-      for (Set<EvalNode> eachEvalSet: indexablePredicateList) {
-        Pair<Datum, Datum> indexPredicationValues = getIndexablePredicateValue(columnMapping, eachEvalSet);
-        if (indexPredicationValues != null) {
-          IndexPredication indexPredication = new IndexPredication();
-          indexPredication.setColumn(indexableColumns[0]);
-          indexPredication.setColumnId(tableDesc.getLogicalSchema().getColumnId(indexableColumns[0].getQualifiedName()));
-          indexPredication.setStartValue(indexPredicationValues.getFirst());
-          indexPredication.setStopValue(indexPredicationValues.getSecond());
-
-          indexPredications.add(indexPredication);
-        }
-      }
-    }
-    return indexPredications;
-  }
-
-  public List<Set<EvalNode>> findIndexablePredicateSet(ScanNode scanNode, Column[] indexableColumns) throws IOException {
-    List<Set<EvalNode>> indexablePredicateList = new ArrayList<Set<EvalNode>>();
-
-    // if a query statement has a search condition, try to find indexable predicates
-    if (indexableColumns != null && scanNode.getQual() != null) {
-      EvalNode[] disjunctiveForms = AlgebraicUtil.toDisjunctiveNormalFormArray(scanNode.getQual());
-
-      // add qualifier to schema for qual
-      for (Column column : indexableColumns) {
-        for (EvalNode disjunctiveExpr : disjunctiveForms) {
-          EvalNode[] conjunctiveForms = AlgebraicUtil.toConjunctiveNormalFormArray(disjunctiveExpr);
-          Set<EvalNode> indexablePredicateSet = Sets.newHashSet();
-          for (EvalNode conjunctiveExpr : conjunctiveForms) {
-            if (checkIfIndexablePredicateOnTargetColumn(conjunctiveExpr, column)) {
-              indexablePredicateSet.add(conjunctiveExpr);
-            }
-          }
-          if (!indexablePredicateSet.isEmpty()) {
-            indexablePredicateList.add(indexablePredicateSet);
-          }
-        }
-      }
-    }
-
-    return indexablePredicateList;
-  }
-
-  private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) {
-    if (checkIfIndexablePredicate(evalNode) || checkIfConjunctiveButOneVariable(evalNode)) {
-      Set<Column> variables = EvalTreeUtil.findUniqueColumns(evalNode);
-      // if it contains only single variable matched to a target column
-      return variables.size() == 1 && variables.contains(targetColumn);
-    } else {
-      return false;
-    }
-  }
-
-  /**
-   *
-   * @param evalNode The expression to be checked
-   * @return true if an conjunctive expression, consisting of indexable expressions
-   */
-  private boolean checkIfConjunctiveButOneVariable(EvalNode evalNode) {
-    if (evalNode.getType() == EvalType.AND) {
-      BinaryEval orEval = (BinaryEval) evalNode;
-      boolean indexable =
-          checkIfIndexablePredicate(orEval.getLeftExpr()) &&
-              checkIfIndexablePredicate(orEval.getRightExpr());
-
-      boolean sameVariable =
-          EvalTreeUtil.findUniqueColumns(orEval.getLeftExpr())
-              .equals(EvalTreeUtil.findUniqueColumns(orEval.getRightExpr()));
-
-      return indexable && sameVariable;
-    } else {
-      return false;
-    }
-  }
-
-  /**
-   * Check if an expression consists of one variable and one constant and
-   * the expression is a comparison operator.
-   *
-   * @param evalNode The expression to be checked
-   * @return true if an expression consists of one variable and one constant
-   * and the expression is a comparison operator. Other, false.
-   */
-  private boolean checkIfIndexablePredicate(EvalNode evalNode) {
-    return AlgebraicUtil.containSingleVar(evalNode) && isIndexableOperator(evalNode);
-  }
-
-  public static boolean isIndexableOperator(EvalNode expr) {
-    return expr.getType() == EvalType.EQUAL ||
-        expr.getType() == EvalType.LEQ ||
-        expr.getType() == EvalType.LTH ||
-        expr.getType() == EvalType.GEQ ||
-        expr.getType() == EvalType.GTH ||
-        expr.getType() == EvalType.BETWEEN;
-  }
-
-  public Pair<Datum, Datum> getIndexablePredicateValue(ColumnMapping columnMapping,
-                                                       Set<EvalNode> evalNodes) {
-    Datum startDatum = null;
-    Datum endDatum = null;
-    for (EvalNode evalNode: evalNodes) {
-      if (evalNode instanceof BinaryEval) {
-        BinaryEval binaryEval = (BinaryEval) evalNode;
-        EvalNode left = binaryEval.getLeftExpr();
-        EvalNode right = binaryEval.getRightExpr();
-
-        Datum constValue = null;
-        if (left.getType() == EvalType.CONST) {
-          constValue = ((ConstEval) left).getValue();
-        } else if (right.getType() == EvalType.CONST) {
-          constValue = ((ConstEval) right).getValue();
-        }
-
-        if (constValue != null) {
-          if (evalNode.getType() == EvalType.EQUAL ||
-              evalNode.getType() == EvalType.GEQ ||
-              evalNode.getType() == EvalType.GTH) {
-            if (startDatum != null) {
-              if (constValue.compareTo(startDatum) > 0) {
-                startDatum = constValue;
-              }
-            } else {
-              startDatum = constValue;
-            }
-          }
-
-          if (evalNode.getType() == EvalType.EQUAL ||
-              evalNode.getType() == EvalType.LEQ ||
-              evalNode.getType() == EvalType.LTH) {
-            if (endDatum != null) {
-              if (constValue.compareTo(endDatum) < 0) {
-                endDatum = constValue;
-              }
-            } else {
-              endDatum = constValue;
-            }
-          }
-        }
-      } else if (evalNode instanceof BetweenPredicateEval) {
-        BetweenPredicateEval betweenEval = (BetweenPredicateEval) evalNode;
-        if (betweenEval.getBegin().getType() == EvalType.CONST && betweenEval.getEnd().getType() == EvalType.CONST) {
-          Datum value = ((ConstEval) betweenEval.getBegin()).getValue();
-          if (startDatum != null) {
-            if (value.compareTo(startDatum) > 0) {
-              startDatum = value;
-            }
-          } else {
-            startDatum = value;
-          }
-
-          value = ((ConstEval) betweenEval.getEnd()).getValue();
-          if (endDatum != null) {
-            if (value.compareTo(endDatum) < 0) {
-              endDatum = value;
-            }
-          } else {
-            endDatum = value;
-          }
-        }
-      }
-    }
-
-    if (endDatum != null && columnMapping != null && columnMapping.getNumRowKeys() > 1) {
-      endDatum = new TextDatum(endDatum.asChars() +
-          new String(new char[]{columnMapping.getRowKeyDelimiter(), Character.MAX_VALUE}));
-    }
-    if (startDatum != null || endDatum != null) {
-      return new Pair<Datum, Datum>(startDatum, endDatum);
-    } else {
-      return null;
-    }
-  }
-
-  @Override
-  public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
-                               LogicalPlan plan, Schema schema,
-                               TableDesc tableDesc) throws IOException {
-    if (tableDesc == null) {
-      throw new IOException("TableDesc is null while calling loadIncrementalHFiles: " + finalEbId);
-    }
-    Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
-    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
-
-    Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(queryContext.getConf(), tableDesc.getMeta());
-    hbaseConf.set("hbase.loadincremental.threads.max", "2");
-
-    JobContextImpl jobContext = new JobContextImpl(hbaseConf,
-        new JobID(finalEbId.getQueryId().toString(), finalEbId.getId()));
-
-    FileOutputCommitter committer = new FileOutputCommitter(stagingResultDir, jobContext);
-    Path jobAttemptPath = committer.getJobAttemptPath(jobContext);
-    FileSystem fs = jobAttemptPath.getFileSystem(queryContext.getConf());
-    if (!fs.exists(jobAttemptPath) || fs.listStatus(jobAttemptPath) == null) {
-      LOG.warn("No query attempt file in " + jobAttemptPath);
-      return stagingResultDir;
-    }
-    committer.commitJob(jobContext);
-
-    if (tableDesc.getName() == null && tableDesc.getPath() != null) {
-
-      // insert into location
-      return super.commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, false);
-    } else {
-      // insert into table
-      String tableName = tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY);
-
-      HTable htable = new HTable(hbaseConf, tableName);
-      try {
-        LoadIncrementalHFiles loadIncrementalHFiles = null;
-        try {
-          loadIncrementalHFiles = new LoadIncrementalHFiles(hbaseConf);
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-          throw new IOException(e.getMessage(), e);
-        }
-        loadIncrementalHFiles.doBulkLoad(stagingResultDir, htable);
-
-        return stagingResultDir;
-      } finally {
-        htable.close();
-      }
-    }
-  }
-
-  @Override
-  public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc,
-                                          Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange)
-      throws IOException {
-    try {
-      int[] sortKeyIndexes = new int[sortSpecs.length];
-      for (int i = 0; i < sortSpecs.length; i++) {
-        sortKeyIndexes[i] = inputSchema.getColumnId(sortSpecs[i].getSortKey().getQualifiedName());
-      }
-
-      ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta());
-      Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(queryContext.getConf(), tableDesc.getMeta());
-
-      HTable htable = new HTable(hbaseConf, columnMapping.getHbaseTableName());
-      try {
-        byte[][] endKeys = htable.getEndKeys();
-        if (endKeys.length == 1) {
-          return new TupleRange[]{dataRange};
-        }
-        List<TupleRange> tupleRanges = new ArrayList<TupleRange>(endKeys.length);
-
-        TupleComparator comparator = new BaseTupleComparator(inputSchema, sortSpecs);
-        Tuple previousTuple = dataRange.getStart();
-
-        for (byte[] eachEndKey : endKeys) {
-          Tuple endTuple = new VTuple(sortSpecs.length);
-          byte[][] rowKeyFields;
-          if (sortSpecs.length > 1) {
-            byte[][] splitValues = BytesUtils.splitPreserveAllTokens(eachEndKey, columnMapping.getRowKeyDelimiter());
-            if (splitValues.length == sortSpecs.length) {
-              rowKeyFields = splitValues;
-            } else {
-              rowKeyFields = new byte[sortSpecs.length][];
-              for (int j = 0; j < sortSpecs.length; j++) {
-                if (j < splitValues.length) {
-                  rowKeyFields[j] = splitValues[j];
-                } else {
-                  rowKeyFields[j] = null;
-                }
-              }
-            }
-
-          } else {
-            rowKeyFields = new byte[1][];
-            rowKeyFields[0] = eachEndKey;
-          }
-
-          for (int i = 0; i < sortSpecs.length; i++) {
-            if (columnMapping.getIsBinaryColumns()[sortKeyIndexes[i]]) {
-              endTuple.put(i,
-                  HBaseBinarySerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]),
-                      rowKeyFields[i]));
-            } else {
-              endTuple.put(i,
-                  HBaseTextSerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]),
-                      rowKeyFields[i]));
-            }
-          }
-          tupleRanges.add(new TupleRange(sortSpecs, previousTuple, endTuple));
-          previousTuple = endTuple;
-        }
-
-        // Last region endkey is empty. Tajo ignores empty key, so endkey is replaced with max data value.
-        if (comparator.compare(dataRange.getEnd(), tupleRanges.get(tupleRanges.size() - 1).getStart()) >= 0) {
-          tupleRanges.get(tupleRanges.size() - 1).setEnd(dataRange.getEnd());
-        } else {
-          tupleRanges.remove(tupleRanges.size() - 1);
-        }
-        return tupleRanges.toArray(new TupleRange[]{});
-      } finally {
-        htable.close();
-      }
-    } catch (Throwable t) {
-      LOG.error(t.getMessage(), t);
-      throw new IOException(t.getMessage(), t);
-    }
-  }
-
-  public List<RewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException {
-    if ("false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) {
-      List<RewriteRule> rules = new ArrayList<RewriteRule>();
-      rules.add(new AddSortForInsertRewriter(tableDesc, getIndexColumns(tableDesc)));
-      return rules;
-    } else {
-      return null;
-    }
-  }
-
-  private Column[] getIndexColumns(TableDesc tableDesc) throws IOException {
-    List<Column> indexColumns = new ArrayList<Column>();
-
-    ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta());
-
-    boolean[] isRowKeys = columnMapping.getIsRowKeyMappings();
-    for (int i = 0; i < isRowKeys.length; i++) {
-      if (isRowKeys[i]) {
-        indexColumns.add(tableDesc.getSchema().getColumn(i));
-      }
-    }
-
-    return indexColumns.toArray(new Column[]{});
-  }
-
-  @Override
-  public StorageProperty getStorageProperty() {
-    StorageProperty storageProperty = new StorageProperty();
-    storageProperty.setSortedInsert(true);
-    storageProperty.setSupportsInsertInto(true);
-    return storageProperty;
-  }
-
-  public void beforeInsertOrCATS(LogicalNode node) throws IOException {
-    if (node.getType() == NodeType.CREATE_TABLE) {
-      CreateTableNode cNode = (CreateTableNode)node;
-      if (!cNode.isExternal()) {
-        TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions());
-        createTable(tableMeta, cNode.getTableSchema(), cNode.isExternal(), cNode.isIfNotExists());
-      }
-    }
-  }
-
-  @Override
-  public void rollbackOutputCommit(LogicalNode node) throws IOException {
-    if (node.getType() == NodeType.CREATE_TABLE) {
-      CreateTableNode cNode = (CreateTableNode)node;
-      if (cNode.isExternal()) {
-        return;
-      }
-      TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions());
-      HBaseAdmin hAdmin =  new HBaseAdmin(getHBaseConfiguration(conf, tableMeta));
-
-      try {
-        HTableDescriptor hTableDesc = parseHTableDescriptor(tableMeta, cNode.getTableSchema());
-        LOG.info("Delete table cause query failed:" + hTableDesc.getName());
-        hAdmin.disableTable(hTableDesc.getName());
-        hAdmin.deleteTable(hTableDesc.getName());
-      } finally {
-        hAdmin.close();
-      }
-    }
-  }
-
-  @Override
-  public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException  {
-    if (tableDesc != null) {
-      Schema tableSchema = tableDesc.getSchema();
-      if (tableSchema.size() != outSchema.size()) {
-        throw new IOException("The number of table columns is different from SELECT columns");
-      }
-
-      for (int i = 0; i < tableSchema.size(); i++) {
-        if (!tableSchema.getColumn(i).getDataType().equals(outSchema.getColumn(i).getDataType())) {
-          throw new IOException(outSchema.getColumn(i).getQualifiedName() +
-              "(" + outSchema.getColumn(i).getDataType().getType() + ")" +
-              " is different column type with " + tableSchema.getColumn(i).getSimpleName() +
-              "(" + tableSchema.getColumn(i).getDataType().getType() + ")");
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
deleted file mode 100644
index a0ad492..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.util.NumberUtil;
-
-import java.io.IOException;
-
-public class HBaseTextSerializerDeserializer {
-  public static Datum deserialize(Column col, byte[] bytes) throws IOException {
-    Datum datum;
-    switch (col.getDataType().getType()) {
-      case INT1:
-      case INT2:
-        datum = bytes == null  || bytes.length == 0 ? NullDatum.get() :
-            DatumFactory.createInt2((short)NumberUtil.parseInt(bytes, 0, bytes.length));
-        break;
-      case INT4:
-        datum = bytes == null || bytes.length == 0 ? NullDatum.get() :
-            DatumFactory.createInt4(NumberUtil.parseInt(bytes, 0, bytes.length));
-        break;
-      case INT8:
-        datum = bytes == null || bytes.length == 0 ? NullDatum.get() :
-            DatumFactory.createInt8(new String(bytes, 0, bytes.length));
-        break;
-      case FLOAT4:
-        datum = bytes == null || bytes.length == 0 ? NullDatum.get() :
-            DatumFactory.createFloat4(new String(bytes, 0, bytes.length));
-        break;
-      case FLOAT8:
-        datum = bytes == null || bytes.length == 0 ? NullDatum.get() :
-            DatumFactory.createFloat8(NumberUtil.parseDouble(bytes, 0, bytes.length));
-        break;
-      case TEXT:
-        datum = bytes == null ? NullDatum.get() : DatumFactory.createText(bytes);
-        break;
-      default:
-        datum = NullDatum.get();
-        break;
-    }
-    return datum;
-  }
-
-  public static byte[] serialize(Column col, Datum datum) throws IOException {
-    if (datum == null || datum instanceof NullDatum) {
-      return null;
-    }
-
-    return datum.asChars().getBytes();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
deleted file mode 100644
index b9425f9..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.util.Bytes;
-
-import java.io.IOException;
-import java.util.TreeSet;
-
-public class HFileAppender extends AbstractHBaseAppender {
-  private static final Log LOG = LogFactory.getLog(HFileAppender.class);
-
-  private RecordWriter<ImmutableBytesWritable, Cell> writer;
-  private TaskAttemptContext writerContext;
-  private Path workingFilePath;
-  private FileOutputCommitter committer;
-
-  public HFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
-                       Schema schema, TableMeta meta, Path stagingDir) {
-    super(conf, taskAttemptId, schema, meta, stagingDir);
-  }
-
-  @Override
-  public void init() throws IOException {
-    super.init();
-
-    Configuration taskConf = new Configuration();
-    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
-    taskConf.set(FileOutputFormat.OUTDIR, stagingResultDir.toString());
-
-    ExecutionBlockId ebId = taskAttemptId.getQueryUnitId().getExecutionBlockId();
-    writerContext = new TaskAttemptContextImpl(taskConf,
-        new TaskAttemptID(ebId.getQueryId().toString(), ebId.getId(), TaskType.MAP,
-            taskAttemptId.getQueryUnitId().getId(), taskAttemptId.getId()));
-
-    HFileOutputFormat2 hFileOutputFormat2 = new HFileOutputFormat2();
-    try {
-      writer = hFileOutputFormat2.getRecordWriter(writerContext);
-
-      committer = new FileOutputCommitter(FileOutputFormat.getOutputPath(writerContext), writerContext);
-      workingFilePath = committer.getWorkPath();
-    } catch (InterruptedException e) {
-      throw new IOException(e.getMessage(), e);
-    }
-
-    LOG.info("Created hbase file writer: " + workingFilePath);
-  }
-
-  long totalNumBytes = 0;
-  ImmutableBytesWritable keyWritable = new ImmutableBytesWritable();
-  boolean first = true;
-  TreeSet<KeyValue> kvSet = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
-
-
-  @Override
-  public void addTuple(Tuple tuple) throws IOException {
-    Datum datum;
-
-    byte[] rowkey = getRowKeyBytes(tuple);
-
-    if (!first && !Bytes.equals(keyWritable.get(), 0, keyWritable.getLength(), rowkey, 0, rowkey.length)) {
-      try {
-        for (KeyValue kv : kvSet) {
-          writer.write(keyWritable, kv);
-          totalNumBytes += keyWritable.getLength() + kv.getLength();
-        }
-        kvSet.clear();
-        // Statistical section
-        if (enabledStats) {
-          stats.incrementRow();
-        }
-      } catch (InterruptedException e) {
-        LOG.error(e.getMessage(), e);
-      }
-    }
-
-    first = false;
-
-    keyWritable.set(rowkey);
-
-    readKeyValues(tuple, rowkey);
-    if (keyValues != null) {
-      for (KeyValue eachKeyVal: keyValues) {
-        kvSet.add(eachKeyVal);
-      }
-    }
-  }
-
-  @Override
-  public void flush() throws IOException {
-  }
-
-  @Override
-  public long getEstimatedOutputSize() throws IOException {
-    // StoreTableExec uses this value as rolling file length
-    // Not rolling
-    return 0;
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (!kvSet.isEmpty()) {
-      try {
-        for (KeyValue kv : kvSet) {
-          writer.write(keyWritable, kv);
-          totalNumBytes += keyWritable.getLength() + keyWritable.getLength();
-        }
-        kvSet.clear();
-        // Statistical section
-        if (enabledStats) {
-          stats.incrementRow();
-        }
-      } catch (InterruptedException e) {
-        LOG.error(e.getMessage(), e);
-      }
-    }
-
-    if (enabledStats) {
-      stats.setNumBytes(totalNumBytes);
-    }
-    if (writer != null) {
-      try {
-        writer.close(writerContext);
-        committer.commitTask(writerContext);
-      } catch (InterruptedException e) {
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
deleted file mode 100644
index 3a58e50..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.datum.Datum;
-
-public class IndexPredication {
-  private Column column;
-  private int columnId;
-  private Datum startValue;
-  private Datum stopValue;
-
-  public Column getColumn() {
-    return column;
-  }
-
-  public void setColumn(Column column) {
-    this.column = column;
-  }
-
-  public int getColumnId() {
-    return columnId;
-  }
-
-  public void setColumnId(int columnId) {
-    this.columnId = columnId;
-  }
-
-  public Datum getStartValue() {
-    return startValue;
-  }
-
-  public void setStartValue(Datum startValue) {
-    this.startValue = startValue;
-  }
-
-  public Datum getStopValue() {
-    return stopValue;
-  }
-
-  public void setStopValue(Datum stopValue) {
-    this.stopValue = stopValue;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java
deleted file mode 100644
index 4577703..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-public class RowKeyMapping {
-  private boolean isBinary;
-  private int keyFieldIndex;
-
-  public boolean isBinary() {
-    return isBinary;
-  }
-
-  public void setBinary(boolean isBinary) {
-    this.isBinary = isBinary;
-  }
-
-  public int getKeyFieldIndex() {
-    return keyFieldIndex;
-  }
-
-  public void setKeyFieldIndex(int keyFieldIndex) {
-    this.keyFieldIndex = keyFieldIndex;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
deleted file mode 100644
index ccba3be..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.index;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.storage.BaseTupleComparator;
-import org.apache.tajo.storage.TupleComparator;
-
-import java.io.IOException;
-
-public interface IndexMethod {
-  IndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
-      TupleComparator comparator) throws IOException;
-  IndexReader getIndexReader(final Path fileName, Schema keySchema,
-      TupleComparator comparator) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java
deleted file mode 100644
index 7baf7aa..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.index;
-
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-
-public interface IndexReader {
-  
-  /**
-   * Find the offset corresponding to key which is equal to a given key.
-   * 
-   * @param key
-   * @return
-   * @throws IOException 
-   */
-  public long find(Tuple key) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java
deleted file mode 100644
index 04738f8..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.storage.index;
-
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-
-public abstract class IndexWriter {
-  
-  public abstract void write(Tuple key, long offset) throws IOException;
-  
-  public abstract void close() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
deleted file mode 100644
index 688bbc7..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.storage.index;
-
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-
-public interface OrderIndexReader extends IndexReader {
-  /**
-   * Find the offset corresponding to key which is equal to or greater than 
-   * a given key.
-   * 
-   * @param key to find
-   * @return
-   * @throws IOException 
-   */
-  public long find(Tuple key, boolean nextKey) throws IOException;
-  
-  /**
-   * Return the next offset from the latest find or next offset
-   * @return
-   * @throws IOException
-   */
-  public long next() throws IOException;
-}


Mime
View raw message