tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject tajo git commit: TAJO-2000: BSTIndex can cause OOM.
Date Mon, 07 Dec 2015 05:07:03 GMT
Repository: tajo
Updated Branches:
  refs/heads/branch-0.11.1 8cdd21f7b -> 70f6c1264


TAJO-2000: BSTIndex can cause OOM.


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/70f6c126
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/70f6c126
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/70f6c126

Branch: refs/heads/branch-0.11.1
Commit: 70f6c126416fdc43e57b26bc01b8271daf1c8bf4
Parents: 8cdd21f
Author: Jinho Kim <jhkim@apache.org>
Authored: Mon Dec 7 14:05:44 2015 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Mon Dec 7 14:05:44 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../org/apache/tajo/storage/BufferPool.java     |  30 +-
 .../org/apache/tajo/tuple/memory/HeapTuple.java |  15 +
 .../apache/tajo/tuple/memory/UnSafeTuple.java   |  14 +
 .../java/org/apache/tajo/util/FileUtil.java     |  22 +
 .../planner/physical/BSTIndexScanExec.java      |   7 +-
 .../physical/RangeShuffleFileWriteExec.java     |  11 +-
 .../engine/planner/physical/StoreIndexExec.java |   2 +-
 .../java/org/apache/tajo/worker/TaskImpl.java   |   4 +-
 .../tajo/pullserver/TajoPullServerService.java  | 162 ++++---
 .../apache/tajo/storage/index/bst/BSTIndex.java | 430 +++++++++++++------
 .../apache/tajo/storage/index/TestBSTIndex.java | 135 +++++-
 .../index/TestSingleCSVFileBSTIndex.java        |   8 +-
 13 files changed, 594 insertions(+), 248 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/70f6c126/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index fffaaf7..3c68bc5 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,8 @@ Release 0.11.1 - unreleased
 
   BUG FIXES
 
+    TAJO-2000: BSTIndex can cause OOM. (jinho)
+
     TAJO-2010: Parquet can not read null value. (jinho)
 
     TAJO-2001: DirectRawFileScanner.getProgress occasionally fails. (jinho)

http://git-wip-us.apache.org/repos/asf/tajo/blob/70f6c126/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
index 4913d3b..7c4e288 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
@@ -94,17 +94,39 @@ public class BufferPool {
 
 
   public static ByteBuf directBuffer(int size) {
-    return ALLOCATOR.directBuffer(size);
+    return directBuffer(size, ByteOrder.LITTLE_ENDIAN);
+  }
+
+  /**
+   * @param size  the initial capacity
+   * @param order the endianness
+   * @return allocated ByteBuf from pool
+   */
+  public static ByteBuf directBuffer(int size, ByteOrder order) {
+    ByteBuf byteBuf = ALLOCATOR.directBuffer(size);
+    if (byteBuf.order() != order) byteBuf.order(order);
+    return byteBuf;
   }
 
   /**
-   *
    * @param size the initial capacity
-   * @param max the max capacity
+   * @param max  the max capacity
    * @return allocated ByteBuf from pool
    */
   public static ByteBuf directBuffer(int size, int max) {
-    return ALLOCATOR.directBuffer(size, max).order(ByteOrder.LITTLE_ENDIAN);
+    return directBuffer(size, max, ByteOrder.LITTLE_ENDIAN);
+  }
+
+  /**
+   * @param size  the initial capacity
+   * @param max   the max capacity
+   * @param order the endianness
+   * @return allocated ByteBuf from pool
+   */
+  public static ByteBuf directBuffer(int size, int max, ByteOrder order) {
+    ByteBuf byteBuf = ALLOCATOR.directBuffer(size, max);
+    if (byteBuf.order() != order) byteBuf.order(order);
+    return byteBuf;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/70f6c126/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java
index c6401ec..aa83d17 100644
--- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java
@@ -33,6 +33,7 @@ import org.apache.tajo.util.StringUtils;
 import org.apache.tajo.util.datetime.TimeMeta;
 
 import java.nio.ByteOrder;
+import java.util.Arrays;
 
 import static org.apache.tajo.common.TajoDataTypes.DataType;
 
@@ -296,6 +297,20 @@ public class HeapTuple extends ZeroCopyTuple implements Cloneable {
   }
 
   @Override
+  public int hashCode() {
+    return Arrays.hashCode(getValues());
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof Tuple) {
+      Tuple other = (Tuple) obj;
+      return Arrays.equals(getValues(), other.getValues());
+    }
+    return false;
+  }
+
+  @Override
   public Tuple clone() throws CloneNotSupportedException {
     HeapTuple heapTuple = (HeapTuple) super.clone();
     heapTuple.buffer = buffer.copy(getRelativePos(), getLength());

http://git-wip-us.apache.org/repos/asf/tajo/blob/70f6c126/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java
index 62e29b8..ea6332b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java
@@ -36,6 +36,7 @@ import sun.misc.Unsafe;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
+import java.util.Arrays;
 
 import static org.apache.tajo.common.TajoDataTypes.DataType;
 
@@ -338,6 +339,19 @@ public class UnSafeTuple extends ZeroCopyTuple {
   }
 
   @Override
+  public int hashCode() {
+    return Arrays.hashCode(getValues());
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof Tuple) {
+      Tuple other = (Tuple) obj;
+      return Arrays.equals(getValues(), other.getValues());
+    }
+    return false;
+  }
+  @Override
   public String toString() {
     return VTuple.toDisplayString(getValues());
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/70f6c126/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
index 118f42a..95700d0 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
@@ -120,4 +120,26 @@ public class FileUtil {
       }
     }
   }
+
+  /**
+   * Close the Closeable objects and <b>throw</b> first {@link IOException}, if failed
+   * @param closeables the objects to close
+   */
+  public static void cleanupAndthrowIfFailed(java.io.Closeable... closeables) throws IOException {
+    IOException ioe = null;
+
+    for (java.io.Closeable c : closeables) {
+      if (c != null) {
+        try {
+          c.close();
+        } catch (IOException e) {
+          if (ioe == null) ioe = e;
+        }
+      }
+    }
+
+    if (ioe != null) {
+      throw ioe;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/70f6c126/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 5352649..1b7f32c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -90,7 +90,6 @@ public class BSTIndexScanExec extends ScanExec {
     Path indexPath = new Path(indexPrefix.toString(), IndexExecutorUtil.getIndexFileName(fragment));
     this.reader = new BSTIndex(context.getConf()).
         getIndexReader(indexPath, keySchema, comparator);
-    this.reader.open();
   }
 
   private static Schema mergeSubSchemas(Schema originalSchema, Schema subSchema, Target[] targets, EvalNode qual) {
@@ -101,9 +100,7 @@ public class BSTIndexScanExec extends ScanExec {
       qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(target.getEvalTree()));
     }
     for (Column column : originalSchema.getRootColumns()) {
-      if (subSchema.contains(column)
-          || qualAndTargets.contains(column)
-          || qualAndTargets.contains(column)) {
+      if (subSchema.contains(column) || qualAndTargets.contains(column)) {
         mergedSchema.addColumn(column);
       }
     }
@@ -127,6 +124,8 @@ public class BSTIndexScanExec extends ScanExec {
 
   @Override
   public void init() throws IOException {
+    reader.init();
+
     Schema projected;
 
     // in the case where projected column or expression are given

http://git-wip-us.apache.org/repos/asf/tajo/blob/70f6c126/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index bcd2b17..e4217b3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -86,9 +86,8 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
     this.appender.enableStats(keySchema.getAllColumns());
     this.appender.init();
     this.indexWriter = bst.getIndexWriter(new Path(storeTablePath, "index"),
-        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
-    this.indexWriter.setLoadNum(100);
-    this.indexWriter.open();
+        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp, true);
+    this.indexWriter.init();
 
     super.init();
   }
@@ -121,13 +120,11 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
     super.close();
 
     appender.flush();
-    IOUtils.cleanup(LOG, appender);
-    indexWriter.flush();
-    IOUtils.cleanup(LOG, indexWriter);
-
     // Collect statistics data
     context.setResultStats(appender.getStats());
     context.addShuffleFileOutput(0, context.getTaskId().toString());
+    IOUtils.cleanup(LOG, appender);
+    indexWriter.close();
     appender = null;
     indexWriter = null;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/70f6c126/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
index fed1d5c..82ef312 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
@@ -80,7 +80,7 @@ public class StoreIndexExec extends UnaryPhysicalExec {
     this.comparator = new BaseTupleComparator(keySchema, sortSpecs);
     this.indexWriter = bst.getIndexWriter(indexPath, BSTIndex.TWO_LEVEL_INDEX, keySchema, comparator);
     this.indexWriter.setLoadNum(100);
-    this.indexWriter.open();
+    this.indexWriter.init();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/70f6c126/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index a622c16..61c30dd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -768,8 +768,8 @@ public class TaskImpl implements Task {
 
       try {
         chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last);
-            } catch (Throwable t) {
-        LOG.error("getFileChunks() throws exception");
+      } catch (Throwable t) {
+        LOG.error(t.getMessage(), t);
         return null;
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/70f6c126/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 3aab2f0..d826127 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -661,109 +661,107 @@ public class TajoPullServerService extends AbstractService {
                                         String endKey,
                                         boolean last) throws IOException {
     BSTIndex index = new BSTIndex(new TajoConf());
-    BSTIndex.BSTIndexReader idxReader =
-        index.getIndexReader(new Path(outDir, "index"));
-    idxReader.open();
-    Schema keySchema = idxReader.getKeySchema();
-    TupleComparator comparator = idxReader.getComparator();
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + ")");
-    }
+    try (BSTIndex.BSTIndexReader idxReader = index.getIndexReader(new Path(outDir, "index"))) {
+      Schema keySchema = idxReader.getKeySchema();
+      TupleComparator comparator = idxReader.getComparator();
 
-    File data = new File(URI.create(outDir.toUri() + "/output"));
-    byte [] startBytes = Base64.decodeBase64(startKey);
-    byte [] endBytes = Base64.decodeBase64(endKey);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + ")");
+      }
 
-    RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
-    Tuple start;
-    Tuple end;
-    try {
-      start = decoder.toTuple(startBytes);
-    } catch (Throwable t) {
-      throw new IllegalArgumentException("StartKey: " + startKey
-          + ", decoded byte size: " + startBytes.length, t);
-    }
+      File data = new File(URI.create(outDir.toUri() + "/output"));
+      byte[] startBytes = Base64.decodeBase64(startKey);
+      byte[] endBytes = Base64.decodeBase64(endKey);
 
-    try {
-      end = decoder.toTuple(endBytes);
-    } catch (Throwable t) {
-      throw new IllegalArgumentException("EndKey: " + endKey
-          + ", decoded byte size: " + endBytes.length, t);
-    }
+      RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
+      Tuple start;
+      Tuple end;
+      try {
+        start = decoder.toTuple(startBytes);
+      } catch (Throwable t) {
+        throw new IllegalArgumentException("StartKey: " + startKey
+            + ", decoded byte size: " + startBytes.length, t);
+      }
 
-    LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end +
-        (last ? ", last=true" : "") + ")");
+      try {
+        end = decoder.toTuple(endBytes);
+      } catch (Throwable t) {
+        throw new IllegalArgumentException("EndKey: " + endKey
+            + ", decoded byte size: " + endBytes.length, t);
+      }
 
-    if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
-      LOG.info("There is no contents");
-      return null;
-    }
+      LOG.info("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end=" + end +
+          (last ? ", last=true" : "") + ")");
 
-    if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
-        comparator.compare(idxReader.getLastKey(), start) < 0) {
-      LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
-          "], but request start:" + start + ", end: " + end);
-      return null;
-    }
+      if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
+        LOG.info("There is no contents");
+        return null;
+      }
 
-    long startOffset;
-    long endOffset;
-    try {
-      startOffset = idxReader.find(start);
-    } catch (IOException ioe) {
-      LOG.error("State Dump (the requested range: "
-          + "[" + start + ", " + end +")" + ", idx min: "
-          + idxReader.getFirstKey() + ", idx max: "
-          + idxReader.getLastKey());
-      throw ioe;
-    }
-    try {
-      endOffset = idxReader.find(end);
-      if (endOffset == -1) {
-        endOffset = idxReader.find(end, true);
+      if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
+          comparator.compare(idxReader.getLastKey(), start) < 0) {
+        LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
+            "], but request start:" + start + ", end: " + end);
+        return null;
       }
-    } catch (IOException ioe) {
-      LOG.error("State Dump (the requested range: "
-          + "[" + start + ", " + end +")" + ", idx min: "
-          + idxReader.getFirstKey() + ", idx max: "
-          + idxReader.getLastKey());
-      throw ioe;
-    }
 
-    // if startOffset == -1 then case 2-1 or case 3
-    if (startOffset == -1) { // this is a hack
-      // if case 2-1 or case 3
+      long startOffset;
+      long endOffset;
       try {
-        startOffset = idxReader.find(start, true);
+        idxReader.init();
+        startOffset = idxReader.find(start);
       } catch (IOException ioe) {
         LOG.error("State Dump (the requested range: "
-            + "[" + start + ", " + end +")" + ", idx min: "
+            + "[" + start + ", " + end + ")" + ", idx min: "
+            + idxReader.getFirstKey() + ", idx max: "
+            + idxReader.getLastKey());
+        throw ioe;
+      }
+      try {
+        endOffset = idxReader.find(end);
+        if (endOffset == -1) {
+          endOffset = idxReader.find(end, true);
+        }
+      } catch (IOException ioe) {
+        LOG.error("State Dump (the requested range: "
+            + "[" + start + ", " + end + ")" + ", idx min: "
             + idxReader.getFirstKey() + ", idx max: "
             + idxReader.getLastKey());
         throw ioe;
       }
-    }
 
-    if (startOffset == -1) {
-      throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
-          "State Dump (the requested range: "
-          + "[" + start + ", " + end +")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
-          + idxReader.getLastKey());
-    }
+      // if startOffset == -1 then case 2-1 or case 3
+      if (startOffset == -1) { // this is a hack
+        // if case 2-1 or case 3
+        try {
+          startOffset = idxReader.find(start, true);
+        } catch (IOException ioe) {
+          LOG.error("State Dump (the requested range: "
+              + "[" + start + ", " + end + ")" + ", idx min: "
+              + idxReader.getFirstKey() + ", idx max: "
+              + idxReader.getLastKey());
+          throw ioe;
+        }
+      }
 
-    // if greater than indexed values
-    if (last || (endOffset == -1
-        && comparator.compare(idxReader.getLastKey(), end) < 0)) {
-      endOffset = data.length();
-    }
+      if (startOffset == -1) {
+        throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
+            "State Dump (the requested range: "
+            + "[" + start + ", " + end + ")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+            + idxReader.getLastKey());
+      }
 
-    idxReader.close();
+      // if greater than indexed values
+      if (last || (endOffset == -1
+          && comparator.compare(idxReader.getLastKey(), end) < 0)) {
+        endOffset = data.length();
+      }
 
-    FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
+      FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
 
-    if(LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
-    return chunk;
+      if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
+      return chunk;
+    }
   }
 
   public static List<String> splitMaps(List<String> mapq) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/70f6c126/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
index ba81c3e..7919ee7 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
@@ -18,13 +18,13 @@
 
 package org.apache.tajo.storage.index.bst;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
 import org.apache.tajo.storage.*;
@@ -33,12 +33,14 @@ import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
 import org.apache.tajo.storage.index.IndexMethod;
 import org.apache.tajo.storage.index.IndexWriter;
 import org.apache.tajo.storage.index.OrderIndexReader;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.FileUtil;
 
-import java.io.Closeable;
-import java.io.FileNotFoundException;
-import java.io.IOException;
+import java.io.*;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
 import java.util.LinkedList;
-import java.util.Set;
+import java.util.Map;
 import java.util.TreeMap;
 
 import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
@@ -55,6 +57,9 @@ public class BSTIndex implements IndexMethod {
 
   public static final int ONE_LEVEL_INDEX = 1;
   public static final int TWO_LEVEL_INDEX = 2;
+  public static final int DEFAULT_INDEX_LOAD = 4096;
+  public static final int BUFFER_SIZE = 128 * StorageUnit.KB;
+  public static final String WRITER_INDEX_LOAD = "tajo.executor.index.writer.load-num";
 
   private final Configuration conf;
 
@@ -62,10 +67,16 @@ public class BSTIndex implements IndexMethod {
     this.conf = conf;
   }
   
+
+  public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
+      TupleComparator comparator, boolean sorted) throws IOException {
+    return new BSTIndexWriter(fileName, level, keySchema, comparator, sorted);
+  }
+
   @Override
   public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
-      TupleComparator comparator) throws IOException {
-    return new BSTIndexWriter(fileName, level, keySchema, comparator);
+                                       TupleComparator comparator) throws IOException {
+    return getIndexWriter(fileName, level, keySchema, comparator, false);
   }
 
   @Override
@@ -78,23 +89,38 @@ public class BSTIndex implements IndexMethod {
   }
 
   public class BSTIndexWriter extends IndexWriter implements Closeable {
+    private FileChannel outChannel;
+    private RandomAccessFile outRandomAccessFile;
     private FSDataOutputStream out;
-    private FileSystem fs;
+    private long filePos;
+
+    private FileChannel rootOutChannel;
+    private RandomAccessFile rootOutRandomAccessFile;
+    private FSDataOutputStream rootOut;
+
+    private boolean isLocal;
+
     private int level;
-    private int loadNum = 4096;
+    private int loadNum;
     private Path fileName;
+    // Target data set is sorted or not
+    private boolean sorted;
+    private boolean writeRootIndex;
 
     private final Schema keySchema;
     private final TupleComparator compartor;
     private final KeyOffsetCollector collector;
     private KeyOffsetCollector rootCollector;
+    private ByteBuf indexBuffer;
+    private ByteBuf rootIndexBuffer;
 
     private Tuple firstKey;
     private Tuple lastKey;
 
     private RowStoreEncoder rowStoreEncoder;
-
-    // private Tuple lastestKey = null;
+    private int loadCount;
+    private int entrySize;
+    private int rootEntrySize;
 
     /**
      * constructor
@@ -104,25 +130,63 @@ public class BSTIndex implements IndexMethod {
      * @throws java.io.IOException
      */
     public BSTIndexWriter(final Path fileName, int level, Schema keySchema,
-        TupleComparator comparator) throws IOException {
+        TupleComparator comparator, boolean sorted) throws IOException {
       this.fileName = fileName;
       this.level = level;
+      this.writeRootIndex = level == TWO_LEVEL_INDEX;
       this.keySchema = keySchema;
       this.compartor = comparator;
       this.collector = new KeyOffsetCollector(comparator);
+      this.rootCollector = new KeyOffsetCollector(this.compartor);
       this.rowStoreEncoder = RowStoreUtil.createEncoder(keySchema);
+      this.sorted = sorted;
+      this.indexBuffer = BufferPool.directBuffer(BUFFER_SIZE, ByteOrder.nativeOrder());
+      this.rootIndexBuffer = BufferPool.directBuffer(BUFFER_SIZE, ByteOrder.nativeOrder());
+      this.loadCount = loadNum = conf.getInt(WRITER_INDEX_LOAD, DEFAULT_INDEX_LOAD);
     }
 
-   public void setLoadNum(int loadNum) {
+    public void setLoadNum(int loadNum) {
       this.loadNum = loadNum;
+      this.loadCount = loadNum;
     }
 
-    public void open() throws IOException {
-      fs = fileName.getFileSystem(conf);
-      if (fs.exists(fileName)) {
-        throw new IOException("ERROR: index file (" + fileName + " already exists");
+    public void init() throws IOException {
+      FileSystem fs = fileName.getFileSystem(conf);
+      Path rootPath = new Path(fileName + ".root");
+      if (fs.exists(fileName) || fs.exists(rootPath)) {
+        throw new IOException("ERROR: index file " + fileName + " or " + rootPath + " already exists");
+      }
+
+      if (fs instanceof LocalFileSystem) {
+        File outFile;
+        try {
+          if (!fs.exists(fileName.getParent())) {
+            fs.mkdirs(fileName.getParent());
+          }
+
+          if (fileName.toUri().getScheme() != null) {
+            outFile = new File(fileName.toUri());
+          } else {
+            outFile = new File(fileName.toString());
+          }
+        } catch (IllegalArgumentException iae) {
+          throw new IOException(iae);
+        }
+
+        outRandomAccessFile = new RandomAccessFile(outFile, "rw");
+        outChannel = outRandomAccessFile.getChannel();
+
+        if (writeRootIndex) {
+          rootOutRandomAccessFile = new RandomAccessFile(new File(outFile.getAbsolutePath() + ".root"), "rw");
+          rootOutChannel = rootOutRandomAccessFile.getChannel();
+        }
+        isLocal = true;
+      } else {
+        out = fs.create(fileName, true);
+        if (writeRootIndex) {
+          rootOut = fs.create(rootPath, true);
+        }
       }
-      out = fs.create(fileName);
     }
 
     @Override
@@ -140,7 +204,83 @@ public class BSTIndex implements IndexMethod {
         lastKey = keyTuple;
       }
 
-      collector.put(keyTuple, offset);
+      if (sorted) {
+         /* root index writing */
+        if (writeRootIndex) {
+          if (loadCount == loadNum) {
+            loadCount = 0;
+            writeRootIndex(rootIndexBuffer, keyTuple, filePos + indexBuffer.writerIndex());
+          }
+          loadCount++;
+        }
+
+        /* leaf index writing */
+        writeIndex(indexBuffer, keyTuple, offset);
+      } else {
+        collector.put(keyTuple, offset);
+      }
+    }
+
+    private void writeIndex(ByteBuf byteBuf, Tuple tuple, Long... offsets) throws IOException {
+
+      byte[] buf = rowStoreEncoder.toBytes(tuple);
+      int size = buf.length + 8 + (offsets.length * 8);
+      if (!byteBuf.isWritable(size)) {
+        byteBuf.ensureWritable(size);
+      }
+
+      // key writing
+      byteBuf.writeInt(buf.length);
+      byteBuf.writeBytes(buf);
+
+      //offset num writing
+      byteBuf.writeInt(offsets.length);
+
+      /* offset writing */
+      for (long offset : offsets) {
+        byteBuf.writeLong(offset);
+      }
+
+      entrySize++;
+      // flush to file and reset buffer
+      if (byteBuf.writerIndex() >= BUFFER_SIZE) {
+        filePos += flushBuffer(byteBuf, outChannel, out);
+      }
+    }
+
+    private void writeRootIndex(ByteBuf byteBuf, Tuple tuple, long offset) throws IOException {
+      byte[] buf = rowStoreEncoder.toBytes(tuple);
+      int size = buf.length + 12;
+      if (!byteBuf.isWritable(size)) {
+        byteBuf.ensureWritable(size);
+      }
+
+      // key writing
+      byteBuf.writeInt(buf.length);
+      byteBuf.writeBytes(buf);
+
+      // leaf offset writing
+      byteBuf.writeLong(offset);
+
+      rootEntrySize++;
+      // flush to file and reset buffer
+      if (byteBuf.writerIndex() >= BUFFER_SIZE) {
+        flushBuffer(byteBuf, rootOutChannel, rootOut);
+      }
+    }
+
+    private int flushBuffer(ByteBuf byteBuf, FileChannel channel, FSDataOutputStream out) throws IOException {
+      // write buffer to file
+      int readableBytes = byteBuf.readableBytes();
+      if (readableBytes > 0) {
+        if (isLocal) {
+          byteBuf.readBytes(channel, readableBytes);
+        } else {
+          byteBuf.readBytes(out, readableBytes);
+        }
+        byteBuf.clear();
+      }
+      return readableBytes;
     }
 
     public TupleComparator getComparator() {
@@ -148,107 +288,128 @@ public class BSTIndex implements IndexMethod {
     }
 
     public void flush() throws IOException {
-      out.flush();
+      if (out != null) {
+        flushBuffer(indexBuffer, outChannel, out);
+        out.flush();
+      }
+
+      if (writeRootIndex && rootOut != null) {
+        flushBuffer(rootIndexBuffer, rootOutChannel, rootOut);
+        rootOut.flush();
+      }
     }
 
-    public void writeHeader(int entryNum) throws IOException {
+    public void writeFooter(int entryNum) throws IOException {
+      indexBuffer.clear();
+
+      long startPosition = filePos;
       // schema
       byte [] schemaBytes = keySchema.getProto().toByteArray();
-      out.writeInt(schemaBytes.length);
-      out.write(schemaBytes);
-
       // comparator
       byte [] comparatorBytes = compartor.getProto().toByteArray();
-      out.writeInt(comparatorBytes.length);
-      out.write(comparatorBytes);
+
+      int size = schemaBytes.length + comparatorBytes.length + 16;
+      if(!indexBuffer.isWritable(size)) {
+        indexBuffer.ensureWritable(size);
+      }
+
+      indexBuffer.writeInt(schemaBytes.length);
+      indexBuffer.writeBytes(schemaBytes);
+
+      indexBuffer.writeInt(comparatorBytes.length);
+      indexBuffer.writeBytes(comparatorBytes);
 
       // level
-      out.writeInt(this.level);
+      indexBuffer.writeInt(this.level);
       // entry
-      out.writeInt(entryNum);
+      indexBuffer.writeInt(entryNum);
       if (entryNum > 0) {
         byte [] minBytes = rowStoreEncoder.toBytes(firstKey);
-        out.writeInt(minBytes.length);
-        out.write(minBytes);
         byte [] maxBytes = rowStoreEncoder.toBytes(lastKey);
-        out.writeInt(maxBytes.length);
-        out.write(maxBytes);
-      }
-      out.flush();
-    }
 
-    public void close() throws IOException {
-      /* two level initialize */
-      if (this.level == TWO_LEVEL_INDEX) {
-        rootCollector = new KeyOffsetCollector(this.compartor);
+        size = minBytes.length + maxBytes.length + 12;
+        if(!indexBuffer.isWritable(size)) {
+          filePos += flushBuffer(indexBuffer, outChannel, out);
+          indexBuffer.ensureWritable(size);
+        }
+
+        indexBuffer.writeInt(minBytes.length);
+        indexBuffer.writeBytes(minBytes);
+        indexBuffer.writeInt(maxBytes.length);
+        indexBuffer.writeBytes(maxBytes);
       }
 
-      /* data writing phase */
-      TreeMap<Tuple, LinkedList<Long>> keyOffsetMap = collector.getMap();
-      Set<Tuple> keySet = keyOffsetMap.keySet();
+      // write footer length
+      int footerSize = (int) (filePos + indexBuffer.readableBytes() + 4 - startPosition);
+      indexBuffer.writeInt(footerSize);
 
-      int entryNum = keySet.size();
-      writeHeader(entryNum);
+      filePos += flushBuffer(indexBuffer, outChannel, out);
+    }
 
-      int loadCount = this.loadNum - 1;
-      for (Tuple key : keySet) {
+    public void close() throws IOException {
+      /* data writing phase */
+      try {
+        if (sorted) {
+          // write remaining data to file
+          filePos += flushBuffer(indexBuffer, outChannel, out);
+        } else {
+          // flush collected index data
+          TreeMap<Tuple, LinkedList<Long>> keyOffsetMap = collector.getMap();
+          for (Map.Entry<Tuple, LinkedList<Long>> entry : keyOffsetMap.entrySet()) {
+
+            /* two level initialize */
+            if (writeRootIndex) {
+              if (loadCount == loadNum) {
+                loadCount = 0;
+                rootCollector.put(entry.getKey(), filePos + indexBuffer.writerIndex());
+              }
+              loadCount++;
+            }
 
-        if (this.level == TWO_LEVEL_INDEX) {
-          loadCount++;
-          if (loadCount == this.loadNum) {
-            rootCollector.put(key, out.getPos());
-            loadCount = 0;
+            LinkedList<Long> offsetList = entry.getValue();
+            writeIndex(indexBuffer, entry.getKey(), offsetList.toArray(new Long[offsetList.size()]));
           }
+          filePos += flushBuffer(indexBuffer, outChannel, out);
+          collector.clear();
         }
-        /* key writing */
-        byte[] buf = rowStoreEncoder.toBytes(key);
-        out.writeInt(buf.length);
-        out.write(buf);
-
-        /**/
-        LinkedList<Long> offsetList = keyOffsetMap.get(key);
-        /* offset num writing */
-        int offsetSize = offsetList.size();
-        out.writeInt(offsetSize);
-        /* offset writing */
-        for (Long offset : offsetList) {
-          out.writeLong(offset);
-        }
-      }
 
-      out.flush();
-      out.close();
-      keySet.clear();
-      collector.clear();
+        writeFooter(entrySize);
 
-      FSDataOutputStream rootOut = null;
-      /* root index creating phase */
-      if (this.level == TWO_LEVEL_INDEX) {
-        TreeMap<Tuple, LinkedList<Long>> rootMap = rootCollector.getMap();
-        keySet = rootMap.keySet();
-
-        rootOut = fs.create(new Path(fileName + ".root"));
-        rootOut.writeInt(this.loadNum);
-        rootOut.writeInt(keySet.size());
-
-        /* root key writing */
-        for (Tuple key : keySet) {
-          byte[] buf = rowStoreEncoder.toBytes(key);
-          rootOut.writeInt(buf.length);
-          rootOut.write(buf);
-
-          LinkedList<Long> offsetList = rootMap.get(key);
-          if (offsetList.size() > 1 || offsetList.size() == 0) {
-            throw new IOException("Why root index doen't have one offset?");
-          }
-          rootOut.writeLong(offsetList.getFirst());
+        /* root index creating phase */
+        if (writeRootIndex) {
+          if (sorted) {
+            //write root index header
+            rootIndexBuffer.writeInt(loadNum);
+            rootIndexBuffer.writeInt(rootEntrySize);
 
+            // write remaining data to file
+            flushBuffer(rootIndexBuffer, rootOutChannel, rootOut);
+          } else {
+            TreeMap<Tuple, LinkedList<Long>> rootMap = rootCollector.getMap();
+            rootIndexBuffer.clear();
+            /* root key writing */
+            for (Map.Entry<Tuple, LinkedList<Long>> entry : rootMap.entrySet()) {
+              LinkedList<Long> offsetList = entry.getValue();
+              if (offsetList.size() != 1) {
+                throw new IOException("Why root index doen't have one offset? offsets:" + offsetList.size());
+              }
+              writeRootIndex(rootIndexBuffer, entry.getKey(), offsetList.getFirst());
+            }
+
+            //write root index header
+            rootIndexBuffer.writeInt(this.loadNum);
+            rootIndexBuffer.writeInt(rootEntrySize);
+
+            flushBuffer(rootIndexBuffer, rootOutChannel, rootOut);
+            rootCollector.clear();
+          }
         }
-        rootOut.flush();
-        rootOut.close();
+      } finally {
+        indexBuffer.release();
+        rootIndexBuffer.release();
 
-        keySet.clear();
-        rootCollector.clear();
+        FileUtil.cleanupAndthrowIfFailed(outChannel, outRandomAccessFile, out,
+            rootOutChannel, rootOutRandomAccessFile, rootOut);
       }
     }
 
@@ -289,7 +450,6 @@ public class BSTIndex implements IndexMethod {
 
     private FileSystem fs;
     private FSDataInputStream indexIn;
-    private FSDataInputStream subIn;
 
     private int level;
     private int entryNum;
@@ -301,6 +461,7 @@ public class BSTIndex implements IndexMethod {
     private int rootCursor;
     private int keyCursor;
     private int offsetCursor;
+    private long dataLength;
 
     // mutex
     private final Object mutex = new Object();
@@ -319,10 +480,12 @@ public class BSTIndex implements IndexMethod {
       this.keySchema = keySchema;
       this.comparator = comparator;
       this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema);
+      open();
     }
 
     public BSTIndexReader(final Path fileName) throws IOException {
       this.fileName = fileName;
+      open();
     }
 
     public Schema getKeySchema() {
@@ -333,11 +496,21 @@ public class BSTIndex implements IndexMethod {
       return this.comparator;
     }
 
-    private void readHeader() throws IOException {
+    private void loadFooter() throws IOException {
+      long fileLength = fs.getFileStatus(this.fileName).getLen();
+
+      //read footer
+      indexIn.seek(fileLength - 4);
+      int footerSize = indexIn.readInt();
+      dataLength = fileLength - footerSize;
+      ByteBuf byteBuf = Unpooled.buffer(footerSize, footerSize);
+      indexIn.seek(dataLength);
+      byteBuf.writeBytes(indexIn, footerSize);
+
       // schema
-      int schemaByteSize = indexIn.readInt();
+      int schemaByteSize = byteBuf.readInt();
       byte [] schemaBytes = new byte[schemaByteSize];
-      StorageUtil.readFully(indexIn, schemaBytes, 0, schemaByteSize);
+      byteBuf.readBytes(schemaBytes);
 
       SchemaProto.Builder builder = SchemaProto.newBuilder();
       builder.mergeFrom(schemaBytes);
@@ -346,30 +519,35 @@ public class BSTIndex implements IndexMethod {
       this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema);
 
       // comparator
-      int compByteSize = indexIn.readInt();
+      int compByteSize = byteBuf.readInt();
       byte [] compBytes = new byte[compByteSize];
-      StorageUtil.readFully(indexIn, compBytes, 0, compByteSize);
+      byteBuf.readBytes(compBytes);
 
       TupleComparatorProto.Builder compProto = TupleComparatorProto.newBuilder();
       compProto.mergeFrom(compBytes);
       this.comparator = new BaseTupleComparator(compProto.build());
 
       // level
-      this.level = indexIn.readInt();
+      this.level = byteBuf.readInt();
       // entry
-      this.entryNum = indexIn.readInt();
+      this.entryNum = byteBuf.readInt();
       if (entryNum > 0) { // if there is no any entry, do not read firstKey/lastKey values
-        byte [] minBytes = new byte[indexIn.readInt()];
-        StorageUtil.readFully(indexIn, minBytes, 0, minBytes.length);
+        byte [] minBytes = new byte[byteBuf.readInt()];
+        byteBuf.readBytes(minBytes);
         this.firstKey = rowStoreDecoder.toTuple(minBytes);
 
-        byte [] maxBytes = new byte[indexIn.readInt()];
-        StorageUtil.readFully(indexIn, maxBytes, 0, maxBytes.length);
+        byte [] maxBytes = new byte[byteBuf.readInt()];
+        byteBuf.readBytes(maxBytes);
         this.lastKey = rowStoreDecoder.toTuple(maxBytes);
       }
+      byteBuf.release();
+    }
+
+    public void init() throws IOException {
+      fillData();
     }
 
-    public void open()
+    private void open()
         throws IOException {
       /* init the index file */
       fs = fileName.getFileSystem(conf);
@@ -378,11 +556,11 @@ public class BSTIndex implements IndexMethod {
       }
 
       indexIn = fs.open(this.fileName);
-      readHeader();
-      fillData();
+      loadFooter();
     }
 
     private void fillData() throws IOException {
+      indexIn.seek(0);
       /* load on memory */
       if (this.level == TWO_LEVEL_INDEX) {
 
@@ -391,13 +569,16 @@ public class BSTIndex implements IndexMethod {
           throw new FileNotFoundException("root index did not created");
         }
 
-        subIn = indexIn;
-        indexIn = fs.open(rootPath);
+        try (FSDataInputStream rootIndexIn = fs.open(rootPath)) {
+          long fileLength = fs.getFileStatus(rootPath).getLen();
         /* root index header reading : type => loadNum => indexSize */
-        this.loadNum = indexIn.readInt();
-        this.entryNum = indexIn.readInt();
-        /**/
-        fillRootIndex(entryNum, indexIn);
+          rootIndexIn.seek(fileLength - 8);
+          this.loadNum = rootIndexIn.readInt();
+          this.entryNum = rootIndexIn.readInt();
+
+          rootIndexIn.seek(0);
+          fillRootIndex(entryNum, rootIndexIn);
+        }
 
       } else {
         fillLeafIndex(entryNum, indexIn, -1);
@@ -455,7 +636,7 @@ public class BSTIndex implements IndexMethod {
           } else {
             if (offsetIndex.length -1 > rootCursor) {
               rootCursor++;
-              fillLeafIndex(loadNum + 1, subIn, this.offsetIndex[rootCursor]);
+              fillLeafIndex(loadNum + 1, indexIn, this.offsetIndex[rootCursor]);
               keyCursor = 1;
               offsetCursor = 0;
             } else {
@@ -485,6 +666,10 @@ public class BSTIndex implements IndexMethod {
         byte[] buf;
         for (int i = 0; i < entryNum; i++) {
           counter++;
+
+          if (in.getPos() >= dataLength)
+            throw new EOFException("Path:" + fileName + ", Pos: " + in.getPos() + ", Data len:" + dataLength);
+
           buf = new byte[in.readInt()];
           StorageUtil.readFully(in, buf, 0, buf.length);
           dataSubIndex[i] = rowStoreDecoder.toTuple(buf);
@@ -494,10 +679,10 @@ public class BSTIndex implements IndexMethod {
           for (int j = 0; j < offsetNum; j++) {
             this.offsetSubIndex[i][j] = in.readLong();
           }
-
         }
 
       } catch (IOException e) {
+        //TODO this block should fix correctly
         counter--;
         if (pos != -1) {
           in.seek(pos);
@@ -567,9 +752,9 @@ public class BSTIndex implements IndexMethod {
       } else {
         rootCursor = 0;
       }
-      fillLeafIndex(loadNum, subIn, this.offsetIndex[rootCursor]);
+      fillLeafIndex(loadNum, indexIn, this.offsetIndex[rootCursor]);
       pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length);
-       
+
       return pos;
     }
 
@@ -618,7 +803,6 @@ public class BSTIndex implements IndexMethod {
     @Override
     public void close() throws IOException {
       this.indexIn.close();
-      this.subIn.close();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/70f6c126/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index 30cea60..a9d8ce2 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -122,7 +122,7 @@ public class TestBSTIndex {
         BSTIndex.TWO_LEVEL_INDEX,
         keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
-    creater.open();
+    creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
         getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -147,7 +147,7 @@ public class TestBSTIndex {
 
     tuple = new VTuple(keySchema.size());
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + dataFormat + ".idx"), keySchema, comp);
-    reader.open();
+    reader.init();
     scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
         getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
@@ -197,7 +197,7 @@ public class TestBSTIndex {
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testBuildIndexWithAppender_" + dataFormat + ".idx"),
         BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
-    creater.open();
+    creater.init();
 
     Tuple tuple;
     long offset;
@@ -227,7 +227,7 @@ public class TestBSTIndex {
     tuple = new VTuple(keySchema.size());
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + dataFormat + ".idx"),
         keySchema, comp);
-    reader.open();
+    reader.init();
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
         getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
@@ -290,7 +290,7 @@ public class TestBSTIndex {
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindOmittedValue_" + dataFormat + ".idx"),
         BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
-    creater.open();
+    creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
         getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -315,7 +315,7 @@ public class TestBSTIndex {
 
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindOmittedValue_" + dataFormat + ".idx"),
         keySchema, comp);
-    reader.open();
+    reader.init();
     for (int i = 1; i < TUPLE_NUM - 1; i += 2) {
       keyTuple.put(0, DatumFactory.createInt8(i));
       keyTuple.put(1, DatumFactory.createFloat8(i));
@@ -363,7 +363,7 @@ public class TestBSTIndex {
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyValue_" + dataFormat + ".idx"),
         BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
-    creater.open();
+    creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
         getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -388,7 +388,7 @@ public class TestBSTIndex {
 
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + dataFormat + ".idx"),
         keySchema, comp);
-    reader.open();
+    reader.init();
     scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
         getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
@@ -456,7 +456,7 @@ public class TestBSTIndex {
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyOmittedValue_" + dataFormat + ".idx"),
         BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
-    creater.open();
+    creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
         getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -481,7 +481,7 @@ public class TestBSTIndex {
 
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + dataFormat + ".idx"),
         keySchema, comp);
-    reader.open();
+    reader.init();
     scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
         getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
@@ -538,7 +538,7 @@ public class TestBSTIndex {
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindMinValue_" + dataFormat + ".idx"),
         BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
-    creater.open();
+    creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
         getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -565,7 +565,7 @@ public class TestBSTIndex {
 
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + dataFormat + ".idx"),
         keySchema, comp);
-    reader.open();
+    reader.init();
     scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
         getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
@@ -624,7 +624,7 @@ public class TestBSTIndex {
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testMinMax_" + dataFormat + ".idx"),
         BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
-    creater.open();
+    creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
         getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -649,7 +649,7 @@ public class TestBSTIndex {
 
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testMinMax_" + dataFormat + ".idx"),
         keySchema, comp);
-    reader.open();
+    reader.init();
 
     Tuple min = reader.getFirstKey();
     assertEquals(5, min.getInt4(0));
@@ -731,7 +731,7 @@ public class TestBSTIndex {
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testConcurrentAccess_" + dataFormat + ".idx"),
         BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
-    creater.open();
+    creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
         getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -756,7 +756,7 @@ public class TestBSTIndex {
 
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testConcurrentAccess_" + dataFormat + ".idx"),
         keySchema, comp);
-    reader.open();
+    reader.init();
 
     Thread[] threads = new Thread[5];
     ConcurrentAccessor[] accs = new ConcurrentAccessor[5];
@@ -812,9 +812,9 @@ public class TestBSTIndex {
 
     BSTIndex bst = new BSTIndex(conf);
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValueDescOrder_" + dataFormat + ".idx"),
-        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp, true);
     creater.setLoadNum(LOAD_NUM);
-    creater.open();
+    creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
         getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -841,7 +841,7 @@ public class TestBSTIndex {
 
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + dataFormat + ".idx"),
         keySchema, comp);
-    reader.open();
+    reader.init();
     scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
         getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
@@ -906,7 +906,7 @@ public class TestBSTIndex {
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
         "testFindNextKeyValueDescOrder_" + dataFormat + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
-    creater.open();
+    creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
         getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -932,7 +932,7 @@ public class TestBSTIndex {
 
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValueDescOrder_" + dataFormat + ".idx"),
         keySchema, comp);
-    reader.open();
+    reader.init();
 
     assertEquals(keySchema, reader.getKeySchema());
     assertEquals(comp, reader.getComparator());
@@ -965,4 +965,97 @@ public class TestBSTIndex {
     reader.close();
     scanner.close();
   }
+
+  @Test
+  public void testFindValueASCOrder() throws IOException {
+    meta = CatalogUtil.newTableMeta(dataFormat);
+
+    Path tablePath = new Path(testDir, "testFindValue_" + dataFormat);
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
+        .getAppender(meta, schema, tablePath);
+    appender.init();
+    Tuple tuple;
+
+    // order by asc
+    for (int i = 0; i < TUPLE_NUM; i++) {
+      tuple = new VTuple(5);
+      tuple.put(0, DatumFactory.createInt4(i));
+      tuple.put(1, DatumFactory.createInt8(i));
+      tuple.put(2, DatumFactory.createFloat8(i));
+      tuple.put(3, DatumFactory.createFloat4(i));
+      tuple.put(4, DatumFactory.createText("field_" + i));
+      appender.addTuple(tuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    long fileLen = status.getLen();
+    FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
+
+    SortSpec[] sortKeys = new SortSpec[2];
+    sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
+    sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
+
+    Schema keySchema = new Schema();
+    keySchema.addColumn(new Column("long", Type.INT8));
+    keySchema.addColumn(new Column("double", Type.FLOAT8));
+
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
+
+    BSTIndex bst = new BSTIndex(conf);
+    BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + dataFormat + ".idx"),
+        BSTIndex.TWO_LEVEL_INDEX,
+        keySchema, comp, true);
+    creater.setLoadNum(LOAD_NUM);
+    creater.init();
+
+    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
+        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+    scanner.init();
+
+    Tuple keyTuple;
+    long offset;
+    while (true) {
+      keyTuple = new VTuple(2);
+      offset = scanner.getNextOffset();
+      tuple = scanner.next();
+      if (tuple == null) break;
+
+      keyTuple.put(0, tuple.asDatum(1));
+      keyTuple.put(1, tuple.asDatum(2));
+      creater.write(keyTuple, offset);
+    }
+
+    creater.flush();
+    creater.close();
+    scanner.close();
+
+    tuple = new VTuple(keySchema.size());
+    BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + dataFormat + ".idx"), keySchema, comp);
+    reader.init();
+    scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
+        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+    scanner.init();
+
+    for (int i = 0; i < TUPLE_NUM - 1; i++) {
+      tuple.put(0, DatumFactory.createInt8(i));
+      tuple.put(1, DatumFactory.createFloat8(i));
+      long offsets = reader.find(tuple);
+      scanner.seek(offsets);
+      tuple = scanner.next();
+      assertTrue("seek check [" + (i) + " ," + (tuple.getInt8(1)) + "]", (i) == (tuple.getInt8(1)));
+      assertTrue("seek check [" + (i) + " ," + (tuple.getFloat8(2)) + "]", (i) == (tuple.getFloat8(2)));
+
+      offsets = reader.next();
+      if (offsets == -1) {
+        continue;
+      }
+      scanner.seek(offsets);
+      tuple = scanner.next();
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.getInt4(0)));
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.getInt8(1)));
+    }
+    reader.close();
+    scanner.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/70f6c126/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
index 8262073..b2ca5b8 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -109,7 +109,7 @@ public class TestSingleCSVFileBSTIndex {
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
         "FindValueInCSV.idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
-    creater.open();
+    creater.init();
 
     SeekableScanner fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat())
         .getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -135,7 +135,7 @@ public class TestSingleCSVFileBSTIndex {
     tuple = new VTuple(keySchema.size());
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir,
         "FindValueInCSV.idx"), keySchema, comp);
-    reader.open();
+    reader.init();
     fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat())
         .getSeekableScanner(meta, schema, tablet.getProto(), schema);
     fileScanner.init();
@@ -200,7 +200,7 @@ public class TestSingleCSVFileBSTIndex {
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyValueInCSV.idx"),
         BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
-    creater.open();
+    creater.init();
     
     SeekableScanner fileScanner  = OldStorageManager.getStorageManager(conf, meta.getDataFormat())
         .getSeekableScanner(meta, schema, tablet.getProto(), schema);
@@ -223,7 +223,7 @@ public class TestSingleCSVFileBSTIndex {
     fileScanner.close();    
     
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp);
-    reader.open();
+    reader.init();
     fileScanner  = OldStorageManager.getStorageManager(conf, meta.getDataFormat())
         .getSeekableScanner(meta, schema, tablet.getProto(), schema);
     fileScanner.init();


Mime
View raw message