tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [1/2] tajo git commit: TAJO-1950: Query master uses too much memory during range shuffle.
Date Thu, 24 Dec 2015 09:19:21 GMT
Repository: tajo
Updated Branches:
  refs/heads/branch-0.11.1 8ddefef85 -> e3443c6df


http://git-wip-us.apache.org/repos/asf/tajo/blob/e3443c6d/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index 61c30dd..bad2510 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -25,17 +25,19 @@ import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.engine.query.TaskRequest;
@@ -53,6 +55,7 @@ import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.querymaster.Repartitioner;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
@@ -92,11 +95,13 @@ public class TaskImpl implements Task {
   private long endTime;
 
   private List<FileChunk> localChunks;
+  private List<FileChunk> remoteChunks;
   // TODO - to be refactored
   private ShuffleType shuffleType = null;
   private Schema finalSchema = null;
 
   private TupleComparator sortComp = null;
+  private final int maxUrlLength;
 
   public TaskImpl(final TaskRequest request,
                   final ExecutionBlockContext executionBlockContext) throws IOException {
@@ -122,6 +127,7 @@ public class TaskImpl implements Task {
     this.context.setDataChannel(request.getDataChannel());
     this.context.setEnforcer(request.getEnforcer());
     this.context.setState(TaskAttemptState.TA_PENDING);
+    this.maxUrlLength = systemConf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
   }
 
   public void initPlan() throws IOException {
@@ -148,14 +154,15 @@ public class TaskImpl implements Task {
     }
 
     this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>());
+    this.remoteChunks = Collections.synchronizedList(new ArrayList<FileChunk>());
 
     LOG.info(String.format("* Task %s is initialized. InterQuery: %b, Shuffle: %s, Fragments:
%d, Fetches:%d, " +
         "Local dir: %s", request.getId(), interQuery, shuffleType, request.getFragments().size(),
         request.getFetches().size(), taskDir));
 
     if(LOG.isDebugEnabled()) {
-      for (FetchImpl f : request.getFetches()) {
-        LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs());
+      for (FetchProto f : request.getFetches()) {
+        LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + Repartitioner.createSimpleURIs(maxUrlLength,
f));
       }
     }
 
@@ -250,6 +257,21 @@ public class TaskImpl implements Task {
 
   @Override
   public void fetch(ExecutorService fetcherExecutor) {
+    // Sort the execution order of fetch runners to increase the cache hit in pull server
+    fetcherRunners.sort(new Comparator<Fetcher>() {
+      @Override
+      public int compare(Fetcher f1, Fetcher f2) {
+        String strUri = f1.getURI().toString();
+        int index = strUri.lastIndexOf("&ta");
+        String taskIdStr1 = strUri.substring(index + "&ta".length());
+
+        strUri = f2.getURI().toString();
+        index = strUri.lastIndexOf("&ta");
+        String taskIdStr2 = strUri.substring(index + "&ta".length());
+        return taskIdStr1.compareTo(taskIdStr2);
+      }
+    });
+
     for (Fetcher f : fetcherRunners) {
       fetcherExecutor.submit(new FetchRunner(context, f));
     }
@@ -375,8 +397,7 @@ public class TaskImpl implements Task {
       if (broadcastTableNames.contains(inputTable)) {
         continue;
       }
-      File tableDir = new File(context.getFetchIn(), inputTable);
-      FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
+      FileFragment[] frags = localizeFetchedData(inputTable);
       context.updateAssignedFragments(inputTable, frags);
     }
   }
@@ -540,24 +561,22 @@ public class TaskImpl implements Task {
     return false;
   }
 
-  private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
+  private FileFragment[] localizeFetchedData(String name)
       throws IOException {
 
     Configuration c = new Configuration(systemConf);
     c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
     FileSystem fs = FileSystem.get(c);
-    Path tablePath = new Path(file.getAbsolutePath());
 
-    List<FileFragment> listTablets = new ArrayList<FileFragment>();
+    List<FileFragment> listTablets = new ArrayList<>();
     FileFragment tablet;
 
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus f : fileLists) {
-      if (f.getLen() == 0) {
-        continue;
+    for (FileChunk chunk : remoteChunks) {
+      if (name.equals(chunk.getEbId())) {
+        tablet = new FileFragment(name, fs.makeQualified(new Path(chunk.getFile().getPath())),
chunk.startOffset(),
+            chunk.length());
+        listTablets.add(tablet);
       }
-      tablet = new FileFragment(name, fs.makeQualified(f.getPath()), 0l, f.getLen());
-      listTablets.add(tablet);
     }
 
     // Special treatment for locally pseudo fetched chunks
@@ -604,11 +623,16 @@ public class TaskImpl implements Task {
             LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
           }
           try {
-            FileChunk fetched = fetcher.get();
-            if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched
!= null
-                && fetched.getFile() != null) {
-              if (fetched.fromRemote() == false) {
-                localChunks.add(fetched);
+            List<FileChunk> fetched = fetcher.get();
+            if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) {
+              for (FileChunk eachFetch : fetched) {
+                if (eachFetch.getFile() != null) {
+                  if (!eachFetch.fromRemote()) {
+                    localChunks.add(eachFetch);
+                  } else {
+                    remoteChunks.add(eachFetch);
+                  }
+                }
               }
               break;
             }
@@ -658,7 +682,7 @@ public class TaskImpl implements Task {
   }
 
   private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
-                                        List<FetchImpl> fetches) throws IOException
{
+                                        List<FetchProto> fetches) throws IOException
{
 
     if (fetches.size() > 0) {
       Path inputDir = executionBlockContext.getLocalDirAllocator().
@@ -668,50 +692,59 @@ public class TaskImpl implements Task {
       int localStoreChunkCount = 0;
       File storeDir;
       File defaultStoreFile;
-      FileChunk storeChunk = null;
+      List<FileChunk> storeChunkList = new ArrayList<>();
       List<Fetcher> runnerList = Lists.newArrayList();
 
-      for (FetchImpl f : fetches) {
+      for (FetchProto f : fetches) {
         storeDir = new File(inputDir.toString(), f.getName());
         if (!storeDir.exists()) {
           if (!storeDir.mkdirs()) throw new IOException("Failed to create " + storeDir);
         }
 
-        for (URI uri : f.getURIs()) {
+        for (URI uri : Repartitioner.createFullURIs(maxUrlLength, f)) {
+          storeChunkList.clear();
           defaultStoreFile = new File(storeDir, "in_" + i);
           InetAddress address = InetAddress.getByName(uri.getHost());
 
           WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
           if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort())
{
 
-            storeChunk = getLocalStoredFileChunk(uri, systemConf);
+            List<FileChunk> localChunkCandidates = getLocalStoredFileChunk(uri, systemConf);
 
-            // When a range request is out of range, storeChunk will be NULL. This case is
normal state.
-            // So, we should skip and don't need to create storeChunk.
-            if (storeChunk == null || storeChunk.length() == 0) {
-              continue;
-            }
+            for (FileChunk localChunk : localChunkCandidates) {
+              // When a range request is out of range, storeChunk will be NULL. This case
is normal state.
+              // So, we should skip and don't need to create storeChunk.
+              if (localChunk == null || localChunk.length() == 0) {
+                continue;
+              }
 
-            if (storeChunk.getFile() != null && storeChunk.startOffset() > -1)
{
-              storeChunk.setFromRemote(false);
-              localStoreChunkCount++;
-            } else {
-              storeChunk = new FileChunk(defaultStoreFile, 0, -1);
-              storeChunk.setFromRemote(true);
+              if (localChunk.getFile() != null && localChunk.startOffset() > -1)
{
+                localChunk.setFromRemote(false);
+                localStoreChunkCount++;
+              } else {
+                localChunk = new FileChunk(defaultStoreFile, 0, -1);
+                localChunk.setFromRemote(true);
+              }
+              localChunk.setEbId(f.getName());
+              storeChunkList.add(localChunk);
             }
+
           } else {
-            storeChunk = new FileChunk(defaultStoreFile, 0, -1);
-            storeChunk.setFromRemote(true);
+            FileChunk remoteChunk = new FileChunk(defaultStoreFile, 0, -1);
+            remoteChunk.setFromRemote(true);
+            remoteChunk.setEbId(f.getName());
+            storeChunkList.add(remoteChunk);
           }
 
           // If we decide that intermediate data should be really fetched from a remote host,
storeChunk
           // represents a complete file. Otherwise, storeChunk may represent a complete file
or only a part of it
-          storeChunk.setEbId(f.getName());
-          Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
-          runnerList.add(fetcher);
-          i++;
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Create a new Fetcher with storeChunk:" + storeChunk.toString());
+          for (FileChunk eachChunk : storeChunkList) {
+            Fetcher fetcher = new Fetcher(systemConf, uri, eachChunk);
+            runnerList.add(fetcher);
+            i++;
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Create a new Fetcher with storeChunk:" + eachChunk.toString());
+            }
           }
         }
       }
@@ -724,7 +757,7 @@ public class TaskImpl implements Task {
     }
   }
 
-  private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException
{
+  private List<FileChunk> getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws
IOException {
     // Parse the URI
 
     // Parsing the URL into key-values
@@ -749,28 +782,37 @@ public class TaskImpl implements Task {
 
     // The working directory of Tajo worker for each query, including stage
     Path queryBaseDir = TajoPullServerService.getBaseOutputDir(queryId, sid);
-    List<String> taskIds = TajoPullServerService.splitMaps(taskIdList);
 
-    FileChunk chunk;
+    List<FileChunk> chunkList = new ArrayList<>();
     // If the stage requires a range shuffle
     if (shuffleType.equals("r")) {
 
-      Path outputPath = StorageUtil.concatPath(queryBaseDir, taskIds.get(0), "output");
-      if (!executionBlockContext.getLocalDirAllocator().ifExists(outputPath.toString(), conf))
{
-        LOG.warn("Range shuffle - file not exist. " + outputPath);
-        return null;
+      final String startKey = params.get("start").get(0);
+      final String endKey = params.get("end").get(0);
+      final boolean last = params.get("final") != null;
+      final List<String> taskIds = TajoPullServerService.splitMaps(taskIdList);
+
+      long before = System.currentTimeMillis();
+      for (String eachTaskId : taskIds) {
+        Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
+        if (!executionBlockContext.getLocalDirAllocator().ifExists(outputPath.toString(),
conf)) {
+          LOG.warn("Range shuffle - file not exist. " + outputPath);
+          continue;
+        }
+        Path path = executionBlockContext.getLocalFS().makeQualified(
+            executionBlockContext.getLocalDirAllocator().getLocalPathToRead(outputPath.toString(),
conf));
+
+        try {
+          FileChunk chunk = TajoPullServerService.getFileChunks(queryId, sid, path, startKey,
endKey, last);
+          chunkList.add(chunk);
+        } catch (Throwable t) {
+          LOG.error(t.getMessage(), t);
+          throw new IOException(t.getCause());
+        }
       }
-      Path path = executionBlockContext.getLocalFS().makeQualified(
-	      executionBlockContext.getLocalDirAllocator().getLocalPathToRead(outputPath.toString(),
conf));
-      String startKey = params.get("start").get(0);
-      String endKey = params.get("end").get(0);
-      boolean last = params.get("final") != null;
-
-      try {
-        chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last);
-      } catch (Throwable t) {
-        LOG.error(t.getMessage(), t);
-        return null;
+      long after = System.currentTimeMillis();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Index lookup time: " + (after - before) + " ms");
       }
 
       // If the stage requires a hash shuffle or a scattered hash shuffle
@@ -779,8 +821,7 @@ public class TaskImpl implements Task {
       Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId),
partId);
 
       if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath.toString(), conf))
{
-        LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
-        return null;
+        throw new IOException("Hash shuffle or Scattered hash shuffle - file not exist: "
+ partPath);
       }
       Path path = executionBlockContext.getLocalFS().makeQualified(
         executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath.toString(),
conf));
@@ -789,17 +830,16 @@ public class TaskImpl implements Task {
       long readLen = (offset >= 0 && length >= 0) ? length : file.length();
 
       if (startPos >= file.length()) {
-        LOG.error("Start pos[" + startPos + "] great than file length [" + file.length()
+ "]");
-        return null;
+        throw new IOException("Start pos[" + startPos + "] great than file length [" + file.length()
+ "]");
       }
-      chunk = new FileChunk(file, startPos, readLen);
+      FileChunk chunk = new FileChunk(file, startPos, readLen);
+      chunkList.add(chunk);
 
     } else {
-      LOG.error("Unknown shuffle type");
-      return null;
+      throw new IOException("Unknown shuffle type");
     }
 
-    return chunk;
+    return chunkList;
   }
 
   public static Path getTaskAttemptDir(TaskAttemptId quid) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/e3443c6d/tajo-core/src/main/proto/ResourceProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceProtos.proto b/tajo-core/src/main/proto/ResourceProtos.proto
index a24c840..5bf4929 100644
--- a/tajo-core/src/main/proto/ResourceProtos.proto
+++ b/tajo-core/src/main/proto/ResourceProtos.proto
@@ -80,15 +80,17 @@ message FetchProto {
     required ExecutionBlockIdProto executionBlockId = 4;
     required int32 partitionId = 5;
     required string name = 6;
-    optional string rangeParams = 7;
-    optional bool hasNext = 8 [default = false];
+    optional bytes range_start = 7;
+    optional bytes range_end = 8;
+    optional bool range_last_inclusive = 9;
+    optional bool has_next = 10 [default = false];
 
-    //repeated part
-    repeated int32 taskId = 9 [packed=true];
-    repeated int32 attemptId = 10 [packed=true];
+    // repeated part
+    repeated int32 task_id = 11 [packed = true];
+    repeated int32 attempt_id = 12 [packed = true];
 
-    optional int64 offset = 11;
-    optional int64 length = 12;
+    optional int64 offset = 13;
+    optional int64 length = 14;
 }
 
 message TaskStatusProto {

http://git-wip-us.apache.org/repos/asf/tajo/blob/e3443c6d/tajo-core/src/main/resources/webapps/worker/task.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/task.jsp b/tajo-core/src/main/resources/webapps/worker/task.jsp
index f2f903b..3e17c8a 100644
--- a/tajo-core/src/main/resources/webapps/worker/task.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/task.jsp
@@ -21,26 +21,24 @@
 
 <%@ page import="org.apache.tajo.ExecutionBlockId" %>
 <%@ page import="org.apache.tajo.QueryId" %>
+<%@ page import="org.apache.tajo.ResourceProtos.FetchProto" %>
 <%@ page import="org.apache.tajo.ResourceProtos.ShuffleFileOutput" %>
 <%@ page import="org.apache.tajo.TaskId" %>
 <%@ page import="org.apache.tajo.catalog.proto.CatalogProtos" %>
 <%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
-<%@ page import="org.apache.tajo.querymaster.Query" %>
-<%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %>
-<%@ page import="org.apache.tajo.querymaster.Stage" %>
-<%@ page import="org.apache.tajo.querymaster.Task" %>
+<%@ page import="org.apache.tajo.querymaster.*" %>
 <%@ page import="org.apache.tajo.storage.DataLocation" %>
 <%@ page import="org.apache.tajo.storage.fragment.Fragment" %>
 <%@ page import="org.apache.tajo.storage.fragment.FragmentConvertor" %>
 <%@ page import="org.apache.tajo.util.JSPUtil" %>
 <%@ page import="org.apache.tajo.util.TajoIdUtils" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.FetchImpl" %>
 <%@ page import="org.apache.tajo.worker.TajoWorker" %>
 <%@ page import="java.net.URI" %>
 <%@ page import="java.text.SimpleDateFormat" %>
 <%@ page import="java.util.Map" %>
 <%@ page import="java.util.Set" %>
+<%@ page import="org.apache.tajo.conf.TajoConf.ConfVars" %>
 
 <%
     String paramQueryId = request.getParameter("queryId");
@@ -64,6 +62,9 @@
         return;
     }
 
+    int maxUrlLength = tajoWorker.getConfig().getInt(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.name(),
+            ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.defaultIntVal);
+
     Query query = queryMasterTask.getQuery();
     Stage stage = query.getStage(ebid);
 
@@ -110,11 +111,11 @@
 
     String fetchInfo = "";
     delim = "";
-    for (Map.Entry<String, Set<FetchImpl>> e : task.getFetchMap().entrySet())
{
+    for (Map.Entry<String, Set<FetchProto>> e : task.getFetchMap().entrySet())
{
         fetchInfo += delim + "<b>" + e.getKey() + "</b>";
         delim = "<br/>";
-        for (FetchImpl f : e.getValue()) {
-            for (URI uri : f.getSimpleURIs()){
+        for (FetchProto f : e.getValue()) {
+            for (URI uri : Repartitioner.createSimpleURIs(maxUrlLength, f)){
                 fetchInfo += delim + uri;
             }
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e3443c6d/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
index 472b967..fc87a2e 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.exception.ExceptionUtil;
 import org.apache.tajo.pullserver.retriever.DataRetriever;
 import org.apache.tajo.pullserver.retriever.FileChunk;
 
@@ -84,7 +85,7 @@ public class HttpDataServerHandler extends SimpleChannelInboundHandler<FullHttpR
     FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
 
     // Write the content.
-    if (file == null) {
+    if (file.length == 0) {
       HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
       if (!HttpHeaders.isKeepAlive(request)) {
         ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
@@ -171,7 +172,8 @@ public class HttpDataServerHandler extends SimpleChannelInboundHandler<FullHttpR
       return;
     }
 
-    LOG.error(cause.getMessage(), cause);
+    LOG.error(cause.getMessage());
+    ExceptionUtil.printStackTraceIfError(LOG, cause);
     if (ch.isActive()) {
       sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e3443c6d/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index d826127..56d7b5b 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.pullserver;
 
+import com.google.common.cache.*;
 import com.google.common.collect.Lists;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.Unpooled;
@@ -53,24 +54,28 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.exception.InvalidURLException;
 import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.rpc.NettyUtils;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
 import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
+import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.*;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 public class TajoPullServerService extends AbstractService {
@@ -88,6 +93,7 @@ public class TajoPullServerService extends AbstractService {
   private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
   private HttpChannelInitializer channelInitializer;
   private int sslFileBufferSize;
+  private int maxUrlLength;
 
   private ApplicationId appId;
   private FileSystem localFS;
@@ -107,25 +113,56 @@ public class TajoPullServerService extends AbstractService {
     new ConcurrentHashMap<String,String>();
   private String userName;
 
+  private static LoadingCache<CacheKey, BSTIndexReader> indexReaderCache = null;
+  private static int lowCacheHitCheckThreshold;
+
   public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
     "tajo.pullserver.ssl.file.buffer.size";
 
   public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
 
-  private static boolean STANDALONE = false;
+  private static final boolean STANDALONE;
 
   private static final AtomicIntegerFieldUpdater<ProcessingStatus> SLOW_FILE_UPDATER;
   private static final AtomicIntegerFieldUpdater<ProcessingStatus> REMAIN_FILE_UPDATER;
 
+  public static final String CHUNK_LENGTH_HEADER_NAME = "c";
+
+  static class CacheKey {
+    private Path path;
+    private String queryId;
+    private String ebSeqId;
+
+    public CacheKey(Path path, String queryId, String ebSeqId) {
+      this.path = path;
+      this.queryId = queryId;
+      this.ebSeqId = ebSeqId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof CacheKey) {
+        CacheKey other = (CacheKey) o;
+        return Objects.equals(this.path, other.path)
+            && Objects.equals(this.queryId, other.queryId)
+            && Objects.equals(this.ebSeqId, other.ebSeqId);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(path, queryId, ebSeqId);
+    }
+  }
+
   static {
     /* AtomicIntegerFieldUpdater can save the memory usage instead of AtomicInteger instance
*/
     SLOW_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "numSlowFile");
     REMAIN_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "remainFiles");
 
     String standalone = System.getenv("TAJO_PULLSERVER_STANDALONE");
-    if (!StringUtils.isEmpty(standalone)) {
-      STANDALONE = standalone.equalsIgnoreCase("true");
-    }
+    STANDALONE = !StringUtils.isEmpty(standalone) && standalone.equalsIgnoreCase("true");
   }
 
   @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
@@ -193,6 +230,9 @@ public class TajoPullServerService extends AbstractService {
 
       localFS = new LocalFileSystem();
 
+      maxUrlLength = conf.getInt(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.name(),
+          ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.defaultIntVal);
+
       conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname
           , conf.getInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal));
       super.init(conf);
@@ -210,7 +250,7 @@ public class TajoPullServerService extends AbstractService {
     }
 
     ServerBootstrap bootstrap = selector.clone();
-    TajoConf tajoConf = (TajoConf)conf;
+    final TajoConf tajoConf = (TajoConf)conf;
     try {
       channelInitializer = new HttpChannelInitializer(tajoConf);
     } catch (Exception ex) {
@@ -219,20 +259,35 @@ public class TajoPullServerService extends AbstractService {
     bootstrap.childHandler(channelInitializer)
       .channel(NioServerSocketChannel.class);
 
-    port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
-        ConfVars.PULLSERVER_PORT.defaultIntVal);
+    port = tajoConf.getIntVar(ConfVars.PULLSERVER_PORT);
     ChannelFuture future = bootstrap.bind(new InetSocketAddress(port))
         .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE)
         .syncUninterruptibly();
 
     accepted.add(future.channel());
     port = ((InetSocketAddress)future.channel().localAddress()).getPort();
-    conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
+    tajoConf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
     LOG.info(getName() + " listening on port " + port);
 
     sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
                                     DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
 
+    int cacheSize = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_SIZE);
+    int cacheTimeout = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_TIMEOUT);
+
+    indexReaderCache = CacheBuilder.newBuilder()
+        .maximumSize(cacheSize)
+        .expireAfterWrite(cacheTimeout, TimeUnit.MINUTES)
+        .removalListener(removalListener)
+        .build(
+            new CacheLoader<CacheKey, BSTIndexReader>() {
+              @Override
+              public BSTIndexReader load(CacheKey key) throws Exception {
+                return new BSTIndex(tajoConf).getIndexReader(new Path(key.path, "index"));
+              }
+            }
+        );
+    lowCacheHitCheckThreshold = (int) (cacheSize * 0.1f);
 
     if (STANDALONE) {
       File pullServerPortFile = getPullServerPortFile();
@@ -312,6 +367,7 @@ public class TajoPullServerService extends AbstractService {
       }
 
       localFS.close();
+      indexReaderCache.invalidateAll();
     } catch (Throwable t) {
       LOG.error(t, t);
     } finally {
@@ -348,7 +404,7 @@ public class TajoPullServerService extends AbstractService {
 
       int maxChunkSize = getConfig().getInt(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname,
           ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal);
-      pipeline.addLast("codec", new HttpServerCodec(4096, 8192, maxChunkSize));
+      pipeline.addLast("codec", new HttpServerCodec(maxUrlLength, 8192, maxChunkSize));
       pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16));
       pipeline.addLast("chunking", new ChunkedWriteHandler());
       pipeline.addLast("shuffle", PullServer);
@@ -422,7 +478,6 @@ public class TajoPullServerService extends AbstractService {
   class PullServer extends SimpleChannelInboundHandler<FullHttpRequest> {
 
     private final TajoConf conf;
-//    private final IndexCache indexCache;
     private final LocalDirAllocator lDirAlloc =
       new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
 
@@ -447,22 +502,36 @@ public class TajoPullServerService extends AbstractService {
     public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
             throws Exception {
 
-      if (request.getMethod() != HttpMethod.GET) {
-        sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
+      if (request.getDecoderResult().isFailure()) {
+        LOG.error("Http decoding failed. ", request.getDecoderResult().cause());
+        sendError(ctx, request.getDecoderResult().toString(), HttpResponseStatus.BAD_REQUEST);
         return;
       }
 
-      ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString());
-      processingStatusMap.put(request.getUri().toString(), processingStatus);
+      if (request.getMethod() == HttpMethod.DELETE) {
+        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+
+        clearIndexCache(request.getUri());
+        return;
+      } else if (request.getMethod() != HttpMethod.GET) {
+        sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
+        return;
+      }
 
       // Parsing the URL into key-values
       Map<String, List<String>> params = null;
       try {
         params = decodeParams(request.getUri());
       } catch (Throwable e) {
+        LOG.error("Failed to decode uri " + request.getUri());
         sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
+        return;
       }
 
+      ProcessingStatus processingStatus = new ProcessingStatus(request.getUri());
+      processingStatusMap.put(request.getUri(), processingStatus);
+
       String partId = params.get("p").get(0);
       String queryId = params.get("qid").get(0);
       String shuffleType = params.get("type").get(0);
@@ -491,28 +560,33 @@ public class TajoPullServerService extends AbstractService {
 
       // if a stage requires a range shuffle
       if (shuffleType.equals("r")) {
-        Path outputPath = StorageUtil.concatPath(queryBaseDir, taskIds.get(0), "output");
-        if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
-          LOG.warn(outputPath + "does not exist.");
-          sendError(ctx, HttpResponseStatus.NO_CONTENT);
-          return;
-        }
-        Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(),
conf));
-        String startKey = params.get("start").get(0);
-        String endKey = params.get("end").get(0);
-        boolean last = params.get("final") != null;
-
-        FileChunk chunk;
-        try {
-          chunk = getFileChunks(path, startKey, endKey, last);
-        } catch (Throwable t) {
-          LOG.error("ERROR Request: " + request.getUri(), t);
-          sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST);
-          return;
-        }
-        if (chunk != null) {
-          chunks.add(chunk);
+        final String startKey = params.get("start").get(0);
+        final String endKey = params.get("end").get(0);
+        final boolean last = params.get("final") != null;
+
+        long before = System.currentTimeMillis();
+        for (String eachTaskId : taskIds) {
+          Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
+          if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
+            LOG.warn(outputPath + "does not exist.");
+            continue;
+          }
+          Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(),
conf));
+
+          FileChunk chunk;
+          try {
+            chunk = getFileChunks(queryId, sid, path, startKey, endKey, last);
+          } catch (Throwable t) {
+            LOG.error("ERROR Request: " + request.getUri(), t);
+            sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST);
+            return;
+          }
+          if (chunk != null) {
+            chunks.add(chunk);
+          }
         }
+        long after = System.currentTimeMillis();
+        LOG.info("Index lookup time: " + (after - before) + " ms");
 
         // if a stage requires a hash shuffle or a scattered hash shuffle
       } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
@@ -536,7 +610,9 @@ public class TajoPullServerService extends AbstractService {
           sendError(ctx, errorMessage, HttpResponseStatus.BAD_REQUEST);
           return;
         }
-        LOG.info("RequestURL: " + request.getUri() + ", fileLen=" + file.length());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("RequestURL: " + request.getUri() + ", fileLen=" + file.length());
+        }
         FileChunk chunk = new FileChunk(file, startPos, readLen);
         chunks.add(chunk);
       } else {
@@ -545,8 +621,6 @@ public class TajoPullServerService extends AbstractService {
         return;
       }
 
-      processingStatus.setNumFiles(chunks.size());
-      processingStatus.makeFileListTime = System.currentTimeMillis() - processingStatus.startTime;
       // Write the content.
       if (chunks.size() == 0) {
         HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
@@ -562,9 +636,13 @@ public class TajoPullServerService extends AbstractService {
         ChannelFuture writeFuture = null;
         HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
         long totalSize = 0;
+        StringBuilder sb = new StringBuilder();
         for (FileChunk chunk : file) {
           totalSize += chunk.length();
+          sb.append(Long.toString(chunk.length())).append(",");
         }
+        sb.deleteCharAt(sb.length() - 1);
+        HttpHeaders.addHeader(response, CHUNK_LENGTH_HEADER_NAME, sb.toString());
         HttpHeaders.setContentLength(response, totalSize);
 
         if (HttpHeaders.isKeepAlive(request)) {
@@ -580,6 +658,7 @@ public class TajoPullServerService extends AbstractService {
             return;
           }
         }
+
         if (ctx.pipeline().get(SslHandler.class) == null) {
           writeFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
         } else {
@@ -594,6 +673,49 @@ public class TajoPullServerService extends AbstractService {
       }
     }
 
+    /**
+     * Upon a request from TajoWorker, this method clears index cache for fetching data of
an execution block.
+     * It is called whenever an execution block is completed.
+     *
+     * @param uri query URI which indicates the execution block id
+     * @throws IOException
+     * @throws InvalidURLException
+     */
+    private void clearIndexCache(String uri) throws IOException, InvalidURLException {
+      // Simply parse the given uri
+      String[] tokens = uri.split("=");
+      if (tokens.length != 2 || !tokens[0].equals("ebid")) {
+        throw new IllegalArgumentException("invalid params: " + uri);
+      }
+      ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(tokens[1]);
+      String queryId = ebId.getQueryId().toString();
+      String ebSeqId = Integer.toString(ebId.getId());
+      List<CacheKey> removed = new ArrayList<>();
+      synchronized (indexReaderCache) {
+        for (Entry<CacheKey, BSTIndexReader> e : indexReaderCache.asMap().entrySet())
{
+          CacheKey key = e.getKey();
+          if (key.queryId.equals(queryId) && key.ebSeqId.equals(ebSeqId)) {
+            e.getValue().forceClose();
+            removed.add(e.getKey());
+          }
+        }
+        indexReaderCache.invalidateAll(removed);
+      }
+      removed.clear();
+      synchronized (waitForRemove) {
+        for (Entry<CacheKey, BSTIndexReader> e : waitForRemove.entrySet()) {
+          CacheKey key = e.getKey();
+          if (key.queryId.equals(queryId) && key.ebSeqId.equals(ebSeqId)) {
+            e.getValue().forceClose();
+            removed.add(e.getKey());
+          }
+        }
+        for (CacheKey eachKey : removed) {
+          waitForRemove.remove(eachKey);
+        }
+      }
+    }
+
     private ChannelFuture sendFile(ChannelHandlerContext ctx,
                                    FileChunk file,
                                    String requestUri) throws IOException {
@@ -617,7 +739,7 @@ public class TajoPullServerService extends AbstractService {
           writeFuture = ctx.write(new HttpChunkedInput(chunk));
         }
       } catch (FileNotFoundException e) {
-        LOG.info(file.getFile() + " not found");
+        LOG.fatal(file.getFile() + " not found");
         return null;
       } catch (Throwable e) {
         if (spill != null) {
@@ -656,26 +778,68 @@ public class TajoPullServerService extends AbstractService {
     }
   }
 
-  public static FileChunk getFileChunks(Path outDir,
+  // Temporal space to wait for the completion of all index lookup operations
+  private static final ConcurrentHashMap<CacheKey, BSTIndexReader> waitForRemove =
new ConcurrentHashMap<>();
+
+  // RemovalListener is triggered when an item is removed from the index reader cache.
+  // It closes index readers when they are not used anymore.
+  // If they are still being used, they are moved to waitForRemove map to wait for other
operations' completion.
+  private static final RemovalListener<CacheKey, BSTIndexReader> removalListener =
new RemovalListener<CacheKey, BSTIndexReader>() {
+    @Override
+    public void onRemoval(RemovalNotification<CacheKey, BSTIndexReader> removal) {
+      BSTIndexReader reader = removal.getValue();
+      if (reader.getReferenceNum() == 0) {
+        try {
+          reader.close(); // tear down properly
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        waitForRemove.remove(removal.getKey());
+      } else {
+        waitForRemove.put(removal.getKey(), reader);
+      }
+    }
+  };
+
+  public static FileChunk getFileChunks(String queryId,
+                                        String ebSeqId,
+                                        Path outDir,
                                         String startKey,
                                         String endKey,
-                                        boolean last) throws IOException {
-    BSTIndex index = new BSTIndex(new TajoConf());
-    try (BSTIndex.BSTIndexReader idxReader = index.getIndexReader(new Path(outDir, "index")))
{
-      Schema keySchema = idxReader.getKeySchema();
-      TupleComparator comparator = idxReader.getComparator();
+                                        boolean last) throws IOException, ExecutionException
{
+
+    BSTIndexReader idxReader = indexReaderCache.get(new CacheKey(outDir, queryId, ebSeqId));
+    idxReader.retain();
 
+    File data;
+    long startOffset;
+    long endOffset;
+    try {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " + idxReader.getLastKey()
+ ")");
+        if (indexReaderCache.size() > lowCacheHitCheckThreshold && indexReaderCache.stats().hitRate()
< 0.5) {
+          LOG.debug("Too low cache hit rate: " + indexReaderCache.stats());
+        }
+      }
+
+      Tuple indexedFirst = idxReader.getFirstKey();
+      Tuple indexedLast = idxReader.getLastKey();
+
+      if (indexedFirst == null && indexedLast == null) { // if # of rows is zero
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("There is no contents");
+        }
+        return null;
       }
 
-      File data = new File(URI.create(outDir.toUri() + "/output"));
       byte[] startBytes = Base64.decodeBase64(startKey);
       byte[] endBytes = Base64.decodeBase64(endKey);
 
-      RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
+
       Tuple start;
       Tuple end;
+      Schema keySchema = idxReader.getKeySchema();
+      RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
+
       try {
         start = decoder.toTuple(startBytes);
       } catch (Throwable t) {
@@ -690,23 +854,23 @@ public class TajoPullServerService extends AbstractService {
             + ", decoded byte size: " + endBytes.length, t);
       }
 
-      LOG.info("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end="
+ end +
-          (last ? ", last=true" : "") + ")");
-
-      if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { //
if # of rows is zero
-        LOG.info("There is no contents");
-        return null;
+      data = new File(URI.create(outDir.toUri() + "/output"));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end="
+ end +
+            (last ? ", last=true" : "") + ")");
       }
 
-      if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
-          comparator.compare(idxReader.getLastKey(), start) < 0) {
-        LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey()
+
-            "], but request start:" + start + ", end: " + end);
+      TupleComparator comparator = idxReader.getComparator();
+
+      if (comparator.compare(end, indexedFirst) < 0 ||
+          comparator.compare(indexedLast, start) < 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Out of Scope (indexed data [" + indexedFirst + ", " + indexedLast +
+              "], but request start:" + start + ", end: " + end);
+        }
         return null;
       }
 
-      long startOffset;
-      long endOffset;
       try {
         idxReader.init();
         startOffset = idxReader.find(start);
@@ -756,12 +920,14 @@ public class TajoPullServerService extends AbstractService {
           && comparator.compare(idxReader.getLastKey(), end) < 0)) {
         endOffset = data.length();
       }
+    } finally {
+      idxReader.release();
+    }
 
-      FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
+    FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
 
-      if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
-      return chunk;
-    }
+    if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
+    return chunk;
   }
 
   public static List<String> splitMaps(List<String> mapq) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/e3443c6d/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
index 7919ee7..4d1f3f8 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
@@ -42,6 +42,8 @@ import java.nio.channels.FileChannel;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
 
@@ -440,6 +442,9 @@ public class BSTIndex implements IndexMethod {
     }
   }
 
+  private static final AtomicIntegerFieldUpdater<BSTIndexReader> REFERENCE_UPDATER
=
+      AtomicIntegerFieldUpdater.newUpdater(BSTIndexReader.class, "referenceNum");
+
   /**
    * BSTIndexReader is thread-safe.
    */
@@ -468,6 +473,10 @@ public class BSTIndex implements IndexMethod {
 
     private RowStoreDecoder rowStoreDecoder;
 
+    private AtomicBoolean inited = new AtomicBoolean(false);
+
+    volatile int referenceNum;
+
     /**
      *
      * @param fileName
@@ -488,6 +497,25 @@ public class BSTIndex implements IndexMethod {
       open();
     }
 
+    /**
+     * Increase the reference number of the index reader.
+     */
+    public void retain() {
+      REFERENCE_UPDATER.compareAndSet(this, referenceNum, referenceNum + 1);
+    }
+
+    /**
+     * Decrease the reference number of the index reader.
+     * This method must be called before {@link #close()}.
+     */
+    public void release() {
+      REFERENCE_UPDATER.compareAndSet(this, referenceNum, referenceNum - 1);
+    }
+
+    public int getReferenceNum() {
+      return referenceNum;
+    }
+
     public Schema getKeySchema() {
       return this.keySchema;
     }
@@ -543,8 +571,10 @@ public class BSTIndex implements IndexMethod {
       byteBuf.release();
     }
 
-    public void init() throws IOException {
-      fillData();
+    public synchronized void init() throws IOException {
+      if (inited.compareAndSet(false, true)) {
+        fillData();
+      }
     }
 
     private void open()
@@ -684,6 +714,8 @@ public class BSTIndex implements IndexMethod {
       } catch (IOException e) {
         //TODO this block should fix correctly
         counter--;
+        if (counter == 0)
+          LOG.info("counter: " + counter);
         if (pos != -1) {
           in.seek(pos);
         }
@@ -765,6 +797,9 @@ public class BSTIndex implements IndexMethod {
 
       //http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6412541
       int centerPos = (start + end) >>> 1;
+      if (arr.length == 0) {
+        LOG.error("arr.length: 0, loadNum: " + loadNum + ", inited: " + inited.get());
+      }
       while (true) {
         if (comparator.compare(arr[centerPos], key) > 0) {
           if (centerPos == 0) {
@@ -800,8 +835,23 @@ public class BSTIndex implements IndexMethod {
       return offset;
     }
 
+    /**
+     * Close index reader only when it is not used anymore.
+     */
     @Override
     public void close() throws IOException {
+      if (referenceNum == 0) {
+        this.indexIn.close();
+      }
+    }
+
+    /**
+     * Close index reader even though it is being used.
+     *
+     * @throws IOException
+     */
+    public void forceClose() throws IOException {
+      REFERENCE_UPDATER.compareAndSet(this, referenceNum, 0);
       this.indexIn.close();
     }
 


Mime
View raw message