tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject tajo git commit: TAJO-1295: Remove legacy worker.dataserver package and its unit tests.
Date Sun, 11 Jan 2015 04:30:00 GMT
Repository: tajo
Updated Branches:
  refs/heads/master c45d0ef02 -> bc478ba83


TAJO-1295: Remove legacy worker.dataserver package and its unit tests.

Closes #345


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/bc478ba8
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/bc478ba8
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/bc478ba8

Branch: refs/heads/master
Commit: bc478ba834e9bba768155faa53f918e495a74671
Parents: c45d0ef
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Sun Jan 11 02:04:29 2015 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Sun Jan 11 02:04:29 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../apache/tajo/worker/InterDataRetriever.java  | 113 ------
 .../tajo/worker/PartitionRetrieverHandler.java  |  44 ---
 .../tajo/worker/RangeRetrieverHandler.java      | 163 --------
 .../FileAccessForbiddenException.java           |  40 --
 .../tajo/worker/dataserver/HttpDataServer.java  |  87 -----
 .../dataserver/HttpDataServerHandler.java       | 199 ----------
 .../HttpDataServerPipelineFactory.java          |  55 ---
 .../apache/tajo/worker/dataserver/HttpUtil.java |  69 ----
 .../retriever/AdvancedDataRetriever.java        | 128 -------
 .../dataserver/retriever/DataRetriever.java     |  29 --
 .../retriever/DirectoryRetriever.java           |  56 ---
 .../worker/dataserver/retriever/FileChunk.java  |  51 ---
 .../dataserver/retriever/RetrieverHandler.java  |  33 --
 .../planner/physical/TestPhysicalPlanner.java   | 103 +----
 .../tajo/worker/TestRangeRetrieverHandler.java  | 381 -------------------
 .../worker/dataserver/TestHttpDataServer.java   | 172 ---------
 .../tajo/worker/dataserver/TestHttpUtil.java    |  49 ---
 18 files changed, 4 insertions(+), 1771 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 5f427a9..5ff3143 100644
--- a/CHANGES
+++ b/CHANGES
@@ -284,6 +284,9 @@ Release 0.9.1 - unreleased
 
   TASKS
 
+    TAJO-1295: Remove legacy worker.dataserver package and its unit tests.
+    (hyunsik)
+
     TAJO-1296: Remove obsolete classes from tajo.master.container package.
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
deleted file mode 100644
index 5b2ad0f..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.worker;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.apache.tajo.TaskId;
-import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
-import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
-import org.apache.tajo.worker.dataserver.retriever.FileChunk;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
-@Deprecated
-public class InterDataRetriever implements DataRetriever {
-  private final Log LOG = LogFactory.getLog(InterDataRetriever.class);
-  private final Set<TaskId> registered = Sets.newHashSet();
-  private final Map<String, String> map = Maps.newConcurrentMap();
-
-  public InterDataRetriever() {
-  }
-  
-  public void register(TaskId id, String baseURI) {
-    synchronized (registered) {
-      if (!registered.contains(id)) {      
-        map.put(id.toString(), baseURI);
-        registered.add(id);      
-      }
-    } 
-  }
-  
-  public void unregister(TaskId id) {
-    synchronized (registered) {
-      if (registered.contains(id)) {
-        map.remove(id.toString());
-        registered.remove(id);
-      }
-    }
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.tajo.worker.dataserver.retriever.DataRetriever#handle(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.handler.codec.http.HttpRequest)
-   */
-  @Override
-  public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
-      throws IOException {
-       
-    int start = request.getUri().indexOf('?');
-    if (start < 0) {
-      throw new IllegalArgumentException("Wrong request: " + request.getUri());
-    }
-    
-    String queryStr = request.getUri().substring(start + 1);
-    LOG.info("QUERY: " + queryStr);
-    String [] queries = queryStr.split("&");
-    
-    String qid = null;
-    String fn = null;
-    String [] kv;
-    for (String query : queries) {
-      kv = query.split("=");
-      if (kv[0].equals("qid")) {
-        qid = kv[1];
-      } else if (kv[0].equals("fn")) {
-        fn = kv[1];
-      }
-    }
-    
-    String baseDir = map.get(qid);
-    if (baseDir == null) {
-      throw new FileNotFoundException("No such qid: " + qid);
-    }
-
-    File file = new File(baseDir + "/" + fn);
-    if (file.isHidden() || !file.exists()) {
-      throw new FileNotFoundException("No such file: " + baseDir + "/" 
-          + file.getName());
-    }
-    if (!file.isFile()) {
-      throw new FileAccessForbiddenException("No such file: " 
-          + baseDir + "/" + file.getName()); 
-    }
-    
-    return new FileChunk[] {new FileChunk(file, 0, file.length())};
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java
deleted file mode 100644
index 36e7353..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import org.apache.tajo.worker.dataserver.retriever.FileChunk;
-import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-public class PartitionRetrieverHandler implements RetrieverHandler {
-  private final String baseDir;
-
-  public PartitionRetrieverHandler(String baseDir) {
-    this.baseDir = baseDir;
-  }
-
-  @Override
-  public FileChunk get(Map<String, List<String>> kvs) throws IOException {
-    // nothing to verify the file because AdvancedDataRetriever checks
-    // its validity of the file.
-    File file = new File(baseDir + "/" + kvs.get("fn").get(0));
-
-    return new FileChunk(file, 0, file.length());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
deleted file mode 100644
index 2b58196..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.BaseTupleComparator;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.apache.tajo.worker.dataserver.retriever.FileChunk;
-import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- *
- * It retrieves the file chunk ranged between start and end keys.
- * The start key is inclusive, but the end key is exclusive.
- *
- * Internally, there are four cases:
- * <ul>
- *   <li>out of scope: the index range does not overlapped with the query range.</li>
- *   <li>overlapped: the index range is partially overlapped with the query range. </li>
- *   <li>included: the index range is included in the start and end keys</li>
- *   <li>covered: the index range covers the query range (i.e., start and end keys).</li>
- * </ul>
- */
-public class RangeRetrieverHandler implements RetrieverHandler {
-  private static final Log LOG = LogFactory.getLog(RangeRetrieverHandler.class);
-  private final File file;
-  private final BSTIndex.BSTIndexReader idxReader;
-  private final Schema schema;
-  private final BaseTupleComparator comp;
-  private final RowStoreDecoder decoder;
-
-  public RangeRetrieverHandler(File outDir, Schema schema, BaseTupleComparator comp) throws IOException {
-    this.file = outDir;
-    BSTIndex index = new BSTIndex(new TajoConf());
-    this.schema = schema;
-    this.comp = comp;
-    FileSystem fs = FileSystem.getLocal(new Configuration());
-    Path indexPath = fs.makeQualified(new Path(outDir.getCanonicalPath(), "index"));
-    this.idxReader =
-        index.getIndexReader(indexPath, this.schema, this.comp);
-    this.idxReader.open();
-    LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
-        + idxReader.getLastKey());
-    this.decoder = RowStoreUtil.createDecoder(schema);
-  }
-
-  @Override
-  public FileChunk get(Map<String, List<String>> kvs) throws IOException {
-    // nothing to verify the file because AdvancedDataRetriever checks
-    // its validity of the file.
-    File data = new File(this.file, "data/data");
-    byte [] startBytes = Base64.decodeBase64(kvs.get("start").get(0));
-    Tuple start = decoder.toTuple(startBytes);
-    byte [] endBytes;
-    Tuple end;
-    endBytes = Base64.decodeBase64(kvs.get("end").get(0));
-    end = decoder.toTuple(endBytes);
-    boolean last = kvs.containsKey("final");
-
-    if(!comp.isAscendingFirstKey()) {
-      Tuple tmpKey = start;
-      start = end;
-      end = tmpKey;
-    }
-
-    LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end +
-        (last ? ", last=true" : "") + ")");
-
-    if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
-      LOG.info("There is no contents");
-      return null;
-    }
-
-    if (comp.compare(end, idxReader.getFirstKey()) < 0 ||
-        comp.compare(idxReader.getLastKey(), start) < 0) {
-      LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
-          "], but request start:" + start + ", end: " + end);
-      return null;
-    }
-
-    long startOffset;
-    long endOffset;
-    try {
-      startOffset = idxReader.find(start);
-    } catch (IOException ioe) {
-      LOG.error("State Dump (the requested range: "
-          + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
-          + idxReader.getLastKey());
-      throw ioe;
-    }
-    try {
-      endOffset = idxReader.find(end);
-      if (endOffset == -1) {
-        endOffset = idxReader.find(end, true);
-      }
-    } catch (IOException ioe) {
-      LOG.error("State Dump (the requested range: "
-          + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
-          + idxReader.getLastKey());
-      throw ioe;
-    }
-
-    // if startOffset == -1 then case 2-1 or case 3
-    if (startOffset == -1) { // this is a hack
-      // if case 2-1 or case 3
-      try {
-        startOffset = idxReader.find(start, true);
-      } catch (IOException ioe) {
-        LOG.error("State Dump (the requested range: "
-            + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
-            + idxReader.getLastKey());
-        throw ioe;
-      }
-    }
-
-    if (startOffset == -1) {
-      throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
-          "State Dump (the requested range: "
-          + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
-          + idxReader.getLastKey());
-    }
-
-    // if greater than indexed values
-    if (last || (endOffset == -1 && comp.compare(idxReader.getLastKey(), end) < 0)) {
-      endOffset = data.length();
-    }
-
-    FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
-    LOG.info("Retrieve File Chunk: " + chunk);
-    return chunk;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java
deleted file mode 100644
index 6c93e4f..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver;
-
-import java.io.IOException;
-
-public class FileAccessForbiddenException extends IOException {
-  private static final long serialVersionUID = -3383272565826389213L;
-
-  public FileAccessForbiddenException() {
-  }
-
-  public FileAccessForbiddenException(String message) {
-    super(message);
-  }
-
-  public FileAccessForbiddenException(Throwable cause) {
-    super(cause);
-  }
-
-  public FileAccessForbiddenException(String message, Throwable cause) {
-    super(message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java
deleted file mode 100644
index 523d6e1..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.net.NetUtils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-
-public class HttpDataServer {
-  private final static Log LOG = LogFactory.getLog(HttpDataServer.class);
-
-  private final InetSocketAddress addr;
-  private InetSocketAddress bindAddr;
-  private ServerBootstrap bootstrap = null;
-  private ChannelFactory factory = null;
-  private ChannelGroup channelGroup = null;
-
-  public HttpDataServer(final InetSocketAddress addr, 
-      final DataRetriever retriever) {
-    this.addr = addr;
-    this.factory = new NioServerSocketChannelFactory(
-        Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
-        Runtime.getRuntime().availableProcessors() * 2);
-
-    // Configure the server.
-    this.bootstrap = new ServerBootstrap(factory);
-    // Set up the event pipeline factory.
-    this.bootstrap.setPipelineFactory(
-        new HttpDataServerPipelineFactory(retriever));    
-    this.channelGroup = new DefaultChannelGroup();
-  }
-
-  public HttpDataServer(String bindaddr, DataRetriever retriever) {
-    this(NetUtils.createSocketAddr(bindaddr), retriever);
-  }
-
-  public void start() {
-    // Bind and start to accept incoming connections.
-    Channel channel = bootstrap.bind(addr);
-    channelGroup.add(channel);    
-    this.bindAddr = (InetSocketAddress) channel.getLocalAddress();
-    LOG.info("HttpDataServer starts up ("
-        + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort()
-        + ")");
-  }
-  
-  public InetSocketAddress getBindAddress() {
-    return this.bindAddr;
-  }
-
-  public void stop() {
-    ChannelGroupFuture future = channelGroup.close();
-    future.awaitUninterruptibly();
-    factory.releaseExternalResources();
-
-    LOG.info("HttpDataServer shutdown ("
-        + this.bindAddr.getAddress().getHostAddress() + ":"
-        + this.bindAddr.getPort() + ")");
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java
deleted file mode 100644
index 6b9eea8..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedFile;
-import org.jboss.netty.util.CharsetUtil;
-import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
-import org.apache.tajo.worker.dataserver.retriever.FileChunk;
-
-import java.io.*;
-import java.net.URLDecoder;
-
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
-  private final static Log LOG = LogFactory.getLog(HttpDataServer.class);
-  private final DataRetriever retriever;
-
-  public HttpDataServerHandler(DataRetriever retriever) {
-    this.retriever = retriever;
-  }
-
-  @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-      throws Exception {
-    HttpRequest request = (HttpRequest) e.getMessage();
-    if (request.getMethod() != GET) {
-      sendError(ctx, METHOD_NOT_ALLOWED);
-      return;
-    }
-
-    FileChunk [] file;
-    try {
-      file = retriever.handle(ctx, request);
-    } catch (FileNotFoundException fnf) {
-      LOG.error(fnf);
-      sendError(ctx, NOT_FOUND);
-      return;
-    } catch (IllegalArgumentException iae) {
-      LOG.error(iae);
-      sendError(ctx, BAD_REQUEST);
-      return;
-    } catch (FileAccessForbiddenException fafe) {
-      LOG.error(fafe);
-      sendError(ctx, FORBIDDEN);
-      return;
-    } catch (IOException ioe) {
-      LOG.error(ioe);
-      sendError(ctx, INTERNAL_SERVER_ERROR);
-      return;
-    }
-
-    // Write the content.
-    Channel ch = e.getChannel();
-    if (file == null) {
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
-      ch.write(response);
-      if (!isKeepAlive(request)) {
-        ch.close();
-      }
-    }  else {
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-      long totalSize = 0;
-      for (FileChunk chunk : file) {
-        totalSize += chunk.length();
-      }
-      setContentLength(response, totalSize);
-
-      // Write the initial line and the header.
-      ch.write(response);
-
-      ChannelFuture writeFuture = null;
-
-      for (FileChunk chunk : file) {
-        writeFuture = sendFile(ctx, ch, chunk);
-        if (writeFuture == null) {
-          sendError(ctx, NOT_FOUND);
-          return;
-        }
-      }
-
-      // Decide whether to close the connection or not.
-      if (!isKeepAlive(request)) {
-        // Close the connection when the whole content is written out.
-        writeFuture.addListener(ChannelFutureListener.CLOSE);
-      }
-    }
-  }
-
-  private ChannelFuture sendFile(ChannelHandlerContext ctx, Channel ch, FileChunk file) throws IOException {
-    RandomAccessFile raf;
-    try {
-      raf = new RandomAccessFile(file.getFile(), "r");
-    } catch (FileNotFoundException fnfe) {
-      return null;
-    }
-
-    ChannelFuture writeFuture;
-    if (ch.getPipeline().get(SslHandler.class) != null) {
-      // Cannot use zero-copy with HTTPS.
-      writeFuture = ch.write(new ChunkedFile(raf, file.startOffset(), file.length(), 8192));
-    } else {
-      // No encryption - use zero-copy.
-      final FileRegion region = new DefaultFileRegion(raf.getChannel(), file.startOffset(), file.length());
-      writeFuture = ch.write(region);
-      writeFuture.addListener(new ChannelFutureListener() {
-        public void operationComplete(ChannelFuture future) {
-          region.releaseExternalResources();
-        }
-      });
-    }
-
-    return writeFuture;
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-      throws Exception {
-    Channel ch = e.getChannel();
-    Throwable cause = e.getCause();
-    if (cause instanceof TooLongFrameException) {
-      sendError(ctx, BAD_REQUEST);
-      return;
-    }
-
-    cause.printStackTrace();
-    if (ch.isConnected()) {
-      sendError(ctx, INTERNAL_SERVER_ERROR);
-    }
-  }
-
-  public static String sanitizeUri(String uri) {
-    // Decode the path.
-    try {
-      uri = URLDecoder.decode(uri, "UTF-8");
-    } catch (UnsupportedEncodingException e) {
-      try {
-        uri = URLDecoder.decode(uri, "ISO-8859-1");
-      } catch (UnsupportedEncodingException e1) {
-        throw new Error();
-      }
-    }
-
-    // Convert file separators.
-    uri = uri.replace('/', File.separatorChar);
-
-    // Simplistic dumb security check.
-    // You will have to do something serious in the production environment.
-    if (uri.contains(File.separator + ".")
-        || uri.contains("." + File.separator) || uri.startsWith(".")
-        || uri.endsWith(".")) {
-      return null;
-    }
-
-    // Convert to absolute path.
-    return uri;
-  }
-
-  private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
-    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
-    response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
-    response.setContent(ChannelBuffers.copiedBuffer(
-        "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
-
-    // Close the connection as soon as the error message is sent.
-    ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
deleted file mode 100644
index 0a47f6b..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
-
-import static org.jboss.netty.channel.Channels.pipeline;
-
-public class HttpDataServerPipelineFactory implements ChannelPipelineFactory {
-  private final DataRetriever ret;
-
-  public HttpDataServerPipelineFactory(DataRetriever ret) {
-    this.ret = ret;
-  }
-
-  public ChannelPipeline getPipeline() throws Exception {
-    // Create a default pipeline implementation.
-    ChannelPipeline pipeline = pipeline();
-
-    // Uncomment the following line if you want HTTPS
-    // SSLEngine engine =
-    // SecureChatSslContextFactory.getServerContext().createSSLEngine();
-    // engine.setUseClientMode(false);
-    // pipeline.addLast("ssl", new SslHandler(engine));
-
-    pipeline.addLast("decoder", new HttpRequestDecoder());
-    //pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
-    pipeline.addLast("encoder", new HttpResponseEncoder());
-    pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
-    //pipeline.addLast("deflater", new HttpContentCompressor());
-    pipeline.addLast("handler", new HttpDataServerHandler(ret));
-    return pipeline;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java
deleted file mode 100644
index e688c39..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver;
-
-import com.google.common.collect.Maps;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URLEncoder;
-import java.util.Map;
-
-public class HttpUtil {
-  public static Map<String,String> getParams(URI uri) throws UnsupportedEncodingException {
-    return getParamsFromQuery(uri.getQuery());
-  }
-
-  /**
-   * It parses a query string into key/value pairs
-   *
-   * @param queryString decoded query string
-   * @return key/value pairs parsed from a given query string
-   * @throws UnsupportedEncodingException
-   */
-  public static Map<String, String> getParamsFromQuery(String queryString) throws UnsupportedEncodingException {
-    String [] queries = queryString.split("&");
-
-    Map<String,String> params = Maps.newHashMap();
-    String [] param;
-    for (String q : queries) {
-      param = q.split("=");
-      params.put(param[0], param[1]);
-    }
-
-    return params;
-  }
-
-  public static String buildQuery(Map<String,String> params) throws UnsupportedEncodingException {
-    StringBuilder sb = new StringBuilder();
-
-    boolean first = true;
-    for (Map.Entry<String,String> param : params.entrySet()) {
-      if (!first) {
-        sb.append("&");
-      }
-      sb.append(URLEncoder.encode(param.getKey(), "UTF-8")).
-          append("=").
-          append(URLEncoder.encode(param.getValue(), "UTF-8"));
-      first = false;
-    }
-
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
deleted file mode 100644
index 9c15d0c..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver.retriever;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.TaskId;
-import org.apache.tajo.util.TajoIdUtils;
-import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class AdvancedDataRetriever implements DataRetriever {
-  private final Log LOG = LogFactory.getLog(AdvancedDataRetriever.class);
-  private final Map<String, RetrieverHandler> handlerMap = Maps.newConcurrentMap();
-
-  public AdvancedDataRetriever() {
-  }
-  
-  public void register(TaskAttemptId id, RetrieverHandler handler) {
-    synchronized (handlerMap) {
-      if (!handlerMap.containsKey(id.toString())) {
-        handlerMap.put(id.toString(), handler);
-      }
-    } 
-  }
-  
-  public void unregister(TaskAttemptId id) {
-    synchronized (handlerMap) {
-      if (handlerMap.containsKey(id.toString())) {
-        handlerMap.remove(id.toString());
-      }
-    }
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.tajo.worker.dataserver.retriever.DataRetriever#handle(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.handler.codec.http.HttpRequest)
-   */
-  @Override
-  public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
-      throws IOException {
-
-    final Map<String, List<String>> params =
-      new QueryStringDecoder(request.getUri()).getParameters();
-
-    if (!params.containsKey("qid")) {
-      throw new FileNotFoundException("No such qid: " + params.containsKey("qid"));
-    }
-
-    if (params.containsKey("sid")) {
-      List<FileChunk> chunks = Lists.newArrayList();
-      List<String> qids = splitMaps(params.get("qid"));
-      for (String qid : qids) {
-        String[] ids = qid.split("_");
-        ExecutionBlockId suid = TajoIdUtils.createExecutionBlockId(params.get("sid").get(0));
-        TaskId quid = new TaskId(suid, Integer.parseInt(ids[0]));
-        TaskAttemptId attemptId = new TaskAttemptId(quid,
-            Integer.parseInt(ids[1]));
-        RetrieverHandler handler = handlerMap.get(attemptId.toString());
-        FileChunk chunk = handler.get(params);
-        chunks.add(chunk);
-      }
-      return chunks.toArray(new FileChunk[chunks.size()]);
-    } else {
-      RetrieverHandler handler = handlerMap.get(params.get("qid").get(0));
-      FileChunk chunk = handler.get(params);
-      if (chunk == null) {
-        if (params.containsKey("qid")) { // if there is no content corresponding to the query
-          return null;
-        } else { // if there is no
-          throw new FileNotFoundException("No such a file corresponding to " + params.get("qid"));
-        }
-      }
-
-      File file = chunk.getFile();
-      if (file.isHidden() || !file.exists()) {
-        throw new FileNotFoundException("No such file: " + file.getAbsolutePath());
-      }
-      if (!file.isFile()) {
-        throw new FileAccessForbiddenException(file.getAbsolutePath() + " is not file");
-      }
-
-      return new FileChunk[] {chunk};
-    }
-  }
-
-  private List<String> splitMaps(List<String> qids) {
-    if (null == qids) {
-      LOG.error("QueryId is EMPTY");
-      return null;
-    }
-
-    final List<String> ret = new ArrayList<String>();
-    for (String qid : qids) {
-      Collections.addAll(ret, qid.split(","));
-    }
-    return ret;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java
deleted file mode 100644
index b26ba74..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver.retriever;
-
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-
-import java.io.IOException;
-
-public interface DataRetriever {
-  FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
-      throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java
deleted file mode 100644
index 62dabbd..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver.retriever;
-
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
-import org.apache.tajo.worker.dataserver.HttpDataServerHandler;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-public class DirectoryRetriever implements DataRetriever {
-  public String baseDir;
-  
-  public DirectoryRetriever(String baseDir) {
-    this.baseDir = baseDir;
-  }
-
-  @Override
-  public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
-      throws IOException {
-    final String path = HttpDataServerHandler.sanitizeUri(request.getUri());
-    if (path == null) {
-      throw new IllegalArgumentException("Wrong path: " +path);
-    }
-
-    File file = new File(baseDir, path);
-    if (file.isHidden() || !file.exists()) {
-      throw new FileNotFoundException("No such file: " + baseDir + "/" + path);
-    }
-    if (!file.isFile()) {
-      throw new FileAccessForbiddenException("No such file: " 
-          + baseDir + "/" + path); 
-    }
-    
-    return new FileChunk[] {new FileChunk(file, 0, file.length())};
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java
deleted file mode 100644
index 4f11168..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver.retriever;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-
-public class FileChunk {
-  private final File file;
-  private final long startOffset;
-  private final long length;
-
-  public FileChunk(File file, long startOffset, long length) throws FileNotFoundException {
-    this.file = file;
-    this.startOffset = startOffset;
-    this.length = length;
-  }
-
-  public File getFile() {
-    return this.file;
-  }
-
-  public long startOffset() {
-    return this.startOffset;
-  }
-
-  public long length() {
-    return this.length;
-  }
-
-  public String toString() {
-    return " (start=" + startOffset() + ", length=" + length + ") "
-        + file.getAbsolutePath();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java
deleted file mode 100644
index e5479cc..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver.retriever;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-public interface RetrieverHandler {
-  /**
-   *
-   * @param kvs url-decoded key/value pairs
-   * @return a desired part of a file
-   * @throws IOException
-   */
-  public FileChunk get(Map<String, List<String>> kvs) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index d3ab1fd..6c606b1 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -19,9 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -44,7 +42,6 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.session.Session;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
@@ -53,27 +50,22 @@ import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.session.Session;
 import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;
-import org.apache.tajo.worker.RangeRetrieverHandler;
 import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.worker.dataserver.retriever.FileChunk;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
@@ -1044,99 +1036,6 @@ public class TestPhysicalPlanner {
   };
 
   @Test
-  public final void testIndexedStoreExec() throws IOException, PlanningException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
-        new Path(employee.getPath()), Integer.MAX_VALUE);
-
-    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testIndexedStoreExec");
-    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
-        LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
-        new FileFragment[] {frags[0]}, workDir);
-    ctx.setEnforcer(new Enforcer());
-    Expr context = analyzer.parse(SORT_QUERY[0]);
-    LogicalPlan plan = planner.createPlan(defaultContext, context);
-    LogicalNode rootNode = optimizer.optimize(plan);
-
-    SortNode sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
-    DataChannel channel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
-        ShuffleType.RANGE_SHUFFLE);
-    channel.setShuffleKeys(PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()).toArray());
-    ctx.setDataChannel(channel);
-
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
-
-    Tuple tuple;
-    exec.init();
-    exec.next();
-    exec.close();
-
-    Schema keySchema = new Schema();
-    keySchema.addColumn("?empId", Type.INT4);
-    SortSpec[] sortSpec = new SortSpec[1];
-    sortSpec[0] = new SortSpec(keySchema.getColumn(0), true, false);
-    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpec);
-    BSTIndex bst = new BSTIndex(conf);
-    BSTIndex.BSTIndexReader reader = bst.getIndexReader(new Path(workDir, "output/index"),
-        keySchema, comp);
-    reader.open();
-    Path outputPath = StorageUtil.concatPath(workDir, "output", "output");
-    TableMeta meta = CatalogUtil.newTableMeta(channel.getStoreType(), new KeyValueSet());
-    SeekableScanner scanner =
-        FileStorageManager.getSeekableScanner(conf, meta, exec.getSchema(), outputPath);
-    scanner.init();
-
-    int cnt = 0;
-
-    while(scanner.next() != null) {
-      cnt++;
-    }
-    scanner.reset();
-
-    assertEquals(100 ,cnt);
-
-    Tuple keytuple = new VTuple(1);
-    for(int i = 1 ; i < 100 ; i ++) {
-      keytuple.put(0, DatumFactory.createInt4(i));
-      long offsets = reader.find(keytuple);
-      scanner.seek(offsets);
-      tuple = scanner.next();
-
-      assertTrue("[seek check " + (i) + " ]", ("name_" + i).equals(tuple.get(0).asChars()));
-      assertTrue("[seek check " + (i) + " ]" , i == tuple.get(1).asInt4());
-    }
-
-
-    // The below is for testing RangeRetrieverHandler.
-    RowStoreEncoder encoder = RowStoreUtil.createEncoder(keySchema);
-    RangeRetrieverHandler handler = new RangeRetrieverHandler(
-        new File(new Path(workDir, "output").toUri()), keySchema, comp);
-    Map<String,List<String>> kvs = Maps.newHashMap();
-    Tuple startTuple = new VTuple(1);
-    startTuple.put(0, DatumFactory.createInt4(50));
-    kvs.put("start", Lists.newArrayList(
-        new String(Base64.encodeBase64(
-            encoder.toBytes(startTuple), false))));
-    Tuple endTuple = new VTuple(1);
-    endTuple.put(0, DatumFactory.createInt4(80));
-    kvs.put("end", Lists.newArrayList(
-        new String(Base64.encodeBase64(
-            encoder.toBytes(endTuple), false))));
-    FileChunk chunk = handler.get(kvs);
-
-    scanner.seek(chunk.startOffset());
-    keytuple = scanner.next();
-    assertEquals(50, keytuple.get(1).asInt4());
-
-    long endOffset = chunk.startOffset() + chunk.length();
-    while((keytuple = scanner.next()) != null && scanner.getNextOffset() <= endOffset) {
-      assertTrue(keytuple.get(1).asInt4() <= 80);
-    }
-
-    scanner.close();
-  }
-
-  @Test
   public final void testSortEnforcer() throws IOException, PlanningException {
     FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
deleted file mode 100644
index 200ba31..0000000
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ /dev/null
@@ -1,381 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.LocalTajoTestingUtility;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.*;
-import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.plan.LogicalOptimizer;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.LogicalPlanner;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.engine.planner.physical.ExternalSortExec;
-import org.apache.tajo.engine.planner.physical.PhysicalExec;
-import org.apache.tajo.engine.planner.physical.ProjectionExec;
-import org.apache.tajo.engine.planner.physical.RangeShuffleFileWriteExec;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.worker.dataserver.retriever.FileChunk;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestRangeRetrieverHandler {
-  private TajoTestingCluster util;
-  private TajoConf conf;
-  private CatalogService catalog;
-  private SQLAnalyzer analyzer;
-  private LogicalPlanner planner;
-  private LogicalOptimizer optimizer;
-  private StorageManager sm;
-  private Schema schema;
-  private static int TEST_TUPLE = 10000;
-  private FileSystem fs;
-  private Path testDir;
-
-  @Before
-  public void setUp() throws Exception {
-    util = new TajoTestingCluster();
-    conf = util.getConfiguration();
-    testDir = CommonTestingUtil.getTestDir();
-    fs = testDir.getFileSystem(conf);
-    util.startCatalogCluster();
-    catalog = util.getMiniCatalogCluster().getCatalog();
-    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
-    catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
-    sm = StorageManager.getFileStorageManager(conf, testDir);
-
-    analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer(conf);
-
-    schema = new Schema();
-    schema.addColumn("empid", Type.INT4);
-    schema.addColumn("age", Type.INT4);
-  }
-
-  @After
-  public void tearDown() {
-    util.shutdownCatalogCluster();
-  }
-
-  public String [] SORT_QUERY = {
-      "select empId, age from employee order by empId, age",
-      "select empId, age from employee order by empId desc, age desc"
-  };
-
-  @Test
-  public void testGet() throws Exception {
-    Tuple firstTuple = null;
-    Tuple lastTuple;
-
-    TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
-
-    Path tableDir = StorageUtil.concatPath(testDir, "testGet", "table.csv");
-    fs.mkdirs(tableDir.getParent());
-    Appender appender = ((FileStorageManager)sm).getAppender(employeeMeta, schema, tableDir);
-    appender.init();
-
-    Tuple tuple = new VTuple(schema.size());
-    for (int i = 0; i < TEST_TUPLE; i++) {
-      tuple.put(
-          new Datum[] {
-              DatumFactory.createInt4(i),
-              DatumFactory.createInt4(i + 5)
-          });
-      appender.addTuple(tuple);
-
-      if (firstTuple == null) {
-        firstTuple = new VTuple(tuple);
-      }
-    }
-    lastTuple = new VTuple(tuple);
-    appender.flush();
-    appender.close();
-
-    TableDesc employee = new TableDesc(
-        CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, employeeMeta,
-        tableDir.toUri());
-    catalog.createTable(employee);
-
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employeeMeta, tableDir, Integer.MAX_VALUE);
-
-    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
-        LocalTajoTestingUtility.newTaskAttemptId(),
-        new FileFragment[] {frags[0]}, testDir);
-    ctx.setEnforcer(new Enforcer());
-    Expr expr = analyzer.parse(SORT_QUERY[0]);
-    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
-    LogicalNode rootNode = optimizer.optimize(plan);
-
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
-
-    ExternalSortExec sort = null;
-    if (exec instanceof ProjectionExec) {
-      ProjectionExec projExec = (ProjectionExec) exec;
-      sort = projExec.getChild();
-    } else if (exec instanceof ExternalSortExec) {
-      sort = (ExternalSortExec) exec;
-    } else {
-      assertTrue(false);
-    }
-
-    SortSpec[] sortSpecs = sort.getPlan().getSortKeys();
-    RangeShuffleFileWriteExec idxStoreExec = new RangeShuffleFileWriteExec(ctx, sort, sort.getSchema(),
-        sort.getSchema(), sortSpecs);
-
-    exec = idxStoreExec;
-    exec.init();
-    exec.next();
-    exec.close();
-
-    Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
-    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpecs);
-    BSTIndex bst = new BSTIndex(conf);
-    BSTIndex.BSTIndexReader reader = bst.getIndexReader(
-        new Path(testDir, "output/index"), keySchema, comp);
-    reader.open();
-
-    TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet());
-    SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema,
-        StorageUtil.concatPath(testDir, "output", "output"));
-
-    scanner.init();
-    int cnt = 0;
-    while(scanner.next() != null) {
-      cnt++;
-    }
-    scanner.reset();
-
-    assertEquals(TEST_TUPLE ,cnt);
-
-    Tuple keytuple = new VTuple(2);
-    for(int i = 1 ; i < TEST_TUPLE ; i ++) {
-      keytuple.put(0, DatumFactory.createInt4(i));
-      keytuple.put(1, DatumFactory.createInt4(i + 5));
-      long offsets = reader.find(keytuple);
-      scanner.seek(offsets);
-      tuple = scanner.next();
-      assertTrue("[seek check " + (i) + " ]" , i == tuple.get(0).asInt4());
-      //assertTrue("[seek check " + (i) + " ]" , ("name_" + i).equals(tuple.get(1).asChars()));
-    }
-
-    TupleRange totalRange = new TupleRange(sortSpecs, firstTuple, lastTuple);
-    UniformRangePartition partitioner = new UniformRangePartition(totalRange, sortSpecs, true);
-    TupleRange [] partitions = partitioner.partition(7);
-
-    // The below is for testing RangeRetrieverHandler.
-    RangeRetrieverHandler handler = new RangeRetrieverHandler(
-        new File((new Path(testDir, "output")).toUri()), keySchema, comp);
-
-    List<Long []> offsets = new ArrayList<Long []>();
-
-    for (int i = 0; i < partitions.length; i++) {
-      FileChunk chunk = getFileChunk(handler, keySchema, partitions[i], i == (partitions.length - 1));
-      offsets.add(new Long[] {chunk.startOffset(), chunk.length()});
-    }
-    scanner.close();
-
-    Long[] previous = null;
-    for (Long [] offset : offsets) {
-      if (offset[0] == 0 && previous == null) {
-        previous = offset;
-        continue;
-      }
-      assertTrue(previous[0] + previous[1] == offset[0]);
-      previous = offset;
-    }
-    long fileLength = new File((new Path(testDir, "index").toUri())).length();
-    assertTrue(previous[0] + previous[1] == fileLength);
-  }
-
-  @Test
-  public void testGetFromDescendingOrder() throws Exception {
-    Tuple firstTuple = null;
-    Tuple lastTuple;
-
-    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
-    Path tablePath = StorageUtil.concatPath(testDir, "testGetFromDescendingOrder", "table.csv");
-    fs.mkdirs(tablePath.getParent());
-    Appender appender = ((FileStorageManager)sm).getAppender(meta, schema, tablePath);
-    appender.init();
-    Tuple tuple = new VTuple(schema.size());
-    for (int i = (TEST_TUPLE - 1); i >= 0 ; i--) {
-      tuple.put(
-          new Datum[] {
-              DatumFactory.createInt4(i),
-              DatumFactory.createInt4(i + 5)
-          });
-      appender.addTuple(tuple);
-
-      if (firstTuple == null) {
-        firstTuple = new VTuple(tuple);
-      }
-    }
-    lastTuple = new VTuple(tuple);
-    appender.flush();
-    appender.close();
-
-    TableDesc employee = new TableDesc(
-        CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, meta, tablePath.toUri());
-    catalog.createTable(employee);
-
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE);
-
-    TaskAttemptContext
-        ctx = new TaskAttemptContext(new QueryContext(conf),
-        LocalTajoTestingUtility.newTaskAttemptId(),
-        new FileFragment[] {frags[0]}, testDir);
-    ctx.setEnforcer(new Enforcer());
-    Expr expr = analyzer.parse(SORT_QUERY[1]);
-    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
-    LogicalNode rootNode = optimizer.optimize(plan);
-
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
-
-    ExternalSortExec sort = null;
-    if (exec instanceof ProjectionExec) {
-      ProjectionExec projExec = (ProjectionExec) exec;
-      sort = projExec.getChild();
-    } else if (exec instanceof ExternalSortExec) {
-      sort = (ExternalSortExec) exec;
-    } else {
-      assertTrue(false);
-    }
-
-    SortSpec[] sortSpecs = sort.getPlan().getSortKeys();
-    RangeShuffleFileWriteExec idxStoreExec = new RangeShuffleFileWriteExec(ctx, sort,
-        sort.getSchema(), sort.getSchema(), sortSpecs);
-
-    exec = idxStoreExec;
-    exec.init();
-    exec.next();
-    exec.close();
-
-    Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
-    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpecs);
-    BSTIndex bst = new BSTIndex(conf);
-    BSTIndex.BSTIndexReader reader = bst.getIndexReader(
-        new Path(testDir, "output/index"), keySchema, comp);
-    reader.open();
-    TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet());
-    SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, outputMeta, schema,
-        StorageUtil.concatPath(testDir, "output", "output"));
-    scanner.init();
-    int cnt = 0;
-    while(scanner.next() != null) {
-      cnt++;
-    }
-    scanner.reset();
-
-    assertEquals(TEST_TUPLE ,cnt);
-
-    Tuple keytuple = new VTuple(2);
-    for(int i = (TEST_TUPLE - 1) ; i >= 0; i --) {
-      keytuple.put(0, DatumFactory.createInt4(i));
-      keytuple.put(1, DatumFactory.createInt4(i + 5));
-      long offsets = reader.find(keytuple);
-      scanner.seek(offsets);
-      tuple = scanner.next();
-      assertTrue("[seek check " + (i) + " ]" , i == tuple.get(0).asInt4());
-    }
-
-    TupleRange totalRange = new TupleRange(sortSpecs, firstTuple, lastTuple);
-    UniformRangePartition partitioner = new UniformRangePartition(totalRange, sortSpecs, true);
-    TupleRange [] partitions = partitioner.partition(25);
-
-    File dataFile = new File((new Path(testDir, "output")).toUri());
-
-    // The below is for testing RangeRetrieverHandler.
-    RangeRetrieverHandler handler = new RangeRetrieverHandler(
-        dataFile, keySchema, comp);
-
-    List<Long []> offsets = new ArrayList<Long []>();
-
-    for (int i = 0; i < partitions.length; i++) {
-      FileChunk chunk = getFileChunk(handler, keySchema, partitions[i], i == 0);
-      offsets.add(new Long[] {chunk.startOffset(), chunk.length()});
-    }
-    scanner.close();
-
-    long fileLength = new File(dataFile, "data/data").length();
-    Long[] previous = null;
-    for (Long [] offset : offsets) {
-      if (previous == null) {
-        assertTrue(offset[0] + offset[1] == fileLength);
-        previous = offset;
-        continue;
-      }
-
-      assertTrue(offset[0] + offset[1] == previous[0]);
-      previous = offset;
-    }
-  }
-
-  private FileChunk getFileChunk(RangeRetrieverHandler handler, Schema keySchema,
-                                 TupleRange range, boolean last) throws IOException {
-    Map<String,List<String>> kvs = Maps.newHashMap();
-    RowStoreEncoder encoder = RowStoreUtil.createEncoder(keySchema);
-    kvs.put("start", Lists.newArrayList(
-        new String(Base64.encodeBase64(
-            encoder.toBytes(range.getStart()),
-            false))));
-    kvs.put("end", Lists.newArrayList(
-        new String(Base64.encodeBase64(
-            encoder.toBytes(range.getEnd()), false))));
-
-    if (last) {
-      kvs.put("final", Lists.newArrayList("true"));
-    }
-    return handler.get(kvs);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java b/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
deleted file mode 100644
index 25a2fbc..0000000
--- a/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver;
-
-import org.apache.hadoop.net.NetUtils;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.LocalTajoTestingUtility;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TaskId;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.worker.InterDataRetriever;
-import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
-import org.apache.tajo.worker.dataserver.retriever.DirectoryRetriever;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.*;
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.util.Random;
-
-import static org.junit.Assert.assertTrue;
-
-public class TestHttpDataServer {
-  private String TEST_DATA = "target/test-data/TestHttpDataServer";
-  
-  @Before
-  public void setUp() throws Exception {
-    CommonTestingUtil.getTestDir(TEST_DATA);
-  }
-
-  @Test
-  public final void testHttpDataServer() throws Exception {
-    Random rnd = new Random();
-    FileWriter writer = new FileWriter(TEST_DATA+"/"+"testHttp");
-    String watermark = "test_"+rnd.nextInt();
-    writer.write(watermark+"\n");
-    writer.flush();
-    writer.close();
-
-    DataRetriever ret = new DirectoryRetriever(TEST_DATA);
-    HttpDataServer server = new HttpDataServer(
-        NetUtils.createSocketAddr("127.0.0.1:0"), ret);
-    server.start();
-
-    InetSocketAddress addr = server.getBindAddress();
-    URL url = new URL("http://127.0.0.1:"+addr.getPort() 
-        + "/testHttp");
-    BufferedReader in = new BufferedReader(new InputStreamReader(
-        url.openStream()));
-    String line;
-    boolean found = false;
-    while ((line = in.readLine()) != null) {
-      if (line.equals(watermark))
-        found = true;
-    }    
-    assertTrue(found);
-    in.close();    
-    server.stop();
-  }
-  
-  @Test
-  public final void testInterDataRetriver() throws Exception {
-    MasterPlan plan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
-    ExecutionBlockId schid = plan.newExecutionBlockId();
-    TaskId qid1 = QueryIdFactory.newTaskId(schid);
-    TaskId qid2 = QueryIdFactory.newTaskId(schid);
-    
-    File qid1Dir = new File(TEST_DATA + "/" + qid1.toString() + "/out");
-    qid1Dir.mkdirs();
-    File qid2Dir = new File(TEST_DATA + "/" + qid2.toString() + "/out");
-    qid2Dir.mkdirs();
-    
-    Random rnd = new Random();
-    FileWriter writer = new FileWriter(qid1Dir+"/"+"testHttp");
-    String watermark1 = "test_"+rnd.nextInt();
-    writer.write(watermark1);
-    writer.flush();
-    writer.close();
-        
-    writer = new FileWriter(qid2Dir+"/"+"testHttp");
-    String watermark2 = "test_"+rnd.nextInt();
-    writer.write(watermark2);
-    writer.flush();
-    writer.close();
-    
-    InterDataRetriever ret = new InterDataRetriever();
-    HttpDataServer server = new HttpDataServer(
-        NetUtils.createSocketAddr("127.0.0.1:0"), ret);
-    server.start();
-    
-    ret.register(qid1, qid1Dir.getPath());
-    ret.register(qid2, qid2Dir.getPath());    
-    
-    InetSocketAddress addr = server.getBindAddress();
-    
-    assertDataRetrival(qid1, addr.getPort(), watermark1);
-    assertDataRetrival(qid2, addr.getPort(), watermark2);
-    
-    server.stop();
-  }
-  
-  @Test(expected = FileNotFoundException.class)
-  public final void testNoSuchFile() throws Exception {
-    MasterPlan plan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
-    ExecutionBlockId schid = plan.newExecutionBlockId();
-    TaskId qid1 = QueryIdFactory.newTaskId(schid);
-    TaskId qid2 = QueryIdFactory.newTaskId(schid);
-    
-    File qid1Dir = new File(TEST_DATA + "/" + qid1.toString() + "/out");
-    qid1Dir.mkdirs();
-    File qid2Dir = new File(TEST_DATA + "/" + qid2.toString() + "/out");
-    qid2Dir.mkdirs();
-    
-    Random rnd = new Random();
-    FileWriter writer = new FileWriter(qid1Dir+"/"+"testHttp");
-    String watermark1 = "test_"+rnd.nextInt();
-    writer.write(watermark1);
-    writer.flush();
-    writer.close();
-        
-    writer = new FileWriter(qid2Dir+"/"+"testHttp");
-    String watermark2 = "test_"+rnd.nextInt();
-    writer.write(watermark2);
-    writer.flush();
-    writer.close();
-    
-    InterDataRetriever ret = new InterDataRetriever();
-    HttpDataServer server = new HttpDataServer(
-        NetUtils.createSocketAddr("127.0.0.1:0"), ret);
-    server.start();
-    
-    ret.register(qid1, qid1Dir.getPath());
-    InetSocketAddress addr = server.getBindAddress();
-    assertDataRetrival(qid1, addr.getPort(), watermark1);
-    ret.unregister(qid1);
-    assertDataRetrival(qid1, addr.getPort(), watermark1);
-  }
-  
-  private static void assertDataRetrival(TaskId id, int port,
-      String watermark) throws IOException {
-    URL url = new URL("http://127.0.0.1:"+port
-        + "/?qid=" + id.toString() + "&fn=testHttp");
-    BufferedReader in = new BufferedReader(new InputStreamReader(
-        url.openStream()));
-    String line;
-    boolean found = false;
-    while ((line = in.readLine()) != null) {
-      if (line.equals(watermark))
-        found = true;
-    }    
-    assertTrue(found);
-    in.close();    
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpUtil.java b/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpUtil.java
deleted file mode 100644
index bb2eb82..0000000
--- a/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpUtil.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver;
-
-import com.google.common.collect.Maps;
-import org.junit.Test;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestHttpUtil {
-  private URI uri = URI.create("http://127.0.0.1:80/?key1=val1&key2=val2");
-
-  @Test
-  public void testGetParams() throws UnsupportedEncodingException {
-    Map<String,String> params = HttpUtil.getParamsFromQuery(uri.getQuery());
-    assertEquals(2, params.size());
-    assertEquals("val1", params.get("key1"));
-    assertEquals("val2", params.get("key2"));
-  }
-
-  @Test
-  public void testBuildQuery() throws UnsupportedEncodingException {
-    Map<String,String> params = Maps.newTreeMap();
-    params.put("key1", "val1");
-    params.put("key2", "val2");
-    String query = HttpUtil.buildQuery(params);
-    assertEquals(uri.getQuery(), query);
-  }
-}


Mime
View raw message