tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ji...@apache.org
Subject [3/3] tajo git commit: TAJO-527: Upgrade to Netty 4
Date Tue, 03 Mar 2015 13:11:24 GMT
TAJO-527: Upgrade to Netty 4

Closes #311


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/22876a82
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/22876a82
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/22876a82

Branch: refs/heads/master
Commit: 22876a825e9d19b0f599c342d4ae3902d85f2c4d
Parents: 64e47a4
Author: Jihun Kang <jihun@apache.org>
Authored: Tue Mar 3 22:10:21 2015 +0900
Committer: Jihun Kang <jihun@apache.org>
Committed: Tue Mar 3 22:10:21 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   1 +
 .../org/apache/tajo/client/QueryClientImpl.java |   3 +-
 .../apache/tajo/client/SessionConnection.java   |  15 +-
 tajo-core/pom.xml                               |   4 +
 .../java/org/apache/tajo/master/TajoMaster.java |   2 +-
 .../tajo/worker/ExecutionBlockContext.java      |  37 +--
 .../java/org/apache/tajo/worker/Fetcher.java    | 198 +++++++-------
 .../java/org/apache/tajo/worker/TajoWorker.java |   2 +-
 .../main/java/org/apache/tajo/worker/Task.java  |  12 +-
 .../java/org/apache/tajo/worker/TaskRunner.java |   4 +-
 .../apache/tajo/worker/TaskRunnerManager.java   |  13 +-
 .../apache/tajo/master/TestRepartitioner.java   |   5 +-
 .../org/apache/tajo/worker/TestFetcher.java     |  25 +-
 tajo-project/pom.xml                            |  24 +-
 tajo-pullserver/pom.xml                         |   8 +
 .../tajo/pullserver/FadvisedChunkedFile.java    |  17 +-
 .../tajo/pullserver/FadvisedFileRegion.java     |  16 +-
 .../tajo/pullserver/FileCloseListener.java      |   8 +-
 .../HttpDataServerChannelInitializer.java       |  58 +++++
 .../tajo/pullserver/HttpDataServerHandler.java  | 137 +++++-----
 .../HttpDataServerPipelineFactory.java          |  56 ----
 .../tajo/pullserver/PullServerAuxService.java   | 229 ++++++++--------
 .../tajo/pullserver/TajoPullServerService.java  | 259 ++++++++++---------
 .../retriever/AdvancedDataRetriever.java        |  10 +-
 .../pullserver/retriever/DataRetriever.java     |   4 +-
 .../retriever/DirectoryRetriever.java           |   5 +-
 tajo-rpc/pom.xml                                |  10 +-
 .../org/apache/tajo/rpc/AsyncRpcClient.java     | 106 +++++---
 .../org/apache/tajo/rpc/AsyncRpcServer.java     | 126 ++++-----
 .../org/apache/tajo/rpc/BlockingRpcClient.java  | 122 +++++----
 .../org/apache/tajo/rpc/BlockingRpcServer.java  | 125 +++++----
 .../java/org/apache/tajo/rpc/CallFuture.java    |   8 +-
 .../apache/tajo/rpc/DefaultRpcController.java   |   7 +-
 .../org/apache/tajo/rpc/NettyClientBase.java    | 133 ++++++----
 .../org/apache/tajo/rpc/NettyServerBase.java    |  82 +++---
 .../java/org/apache/tajo/rpc/NullCallback.java  |   2 +-
 .../tajo/rpc/ProtoChannelInitializer.java       |  50 ++++
 .../apache/tajo/rpc/ProtoPipelineFactory.java   |  50 ----
 .../org/apache/tajo/rpc/RpcChannelFactory.java  | 160 ++++++++----
 .../org/apache/tajo/rpc/RpcConnectionPool.java  |  87 +++----
 .../org/apache/tajo/rpc/ServerCallable.java     |  10 +-
 .../java/org/apache/tajo/rpc/TestAsyncRpc.java  | 144 ++++++++---
 .../org/apache/tajo/rpc/TestBlockingRpc.java    | 138 +++++++---
 .../rpc/test/impl/DummyProtocolAsyncImpl.java   |   3 +-
 tajo-storage/tajo-storage-hdfs/pom.xml          |  12 +
 .../java/org/apache/tajo/HttpFileServer.java    |  44 ++--
 .../tajo/HttpFileServerChannelInitializer.java  |  47 ++++
 .../org/apache/tajo/HttpFileServerHandler.java  | 109 ++++----
 .../tajo/HttpFileServerPipelineFactory.java     |  54 ----
 49 files changed, 1552 insertions(+), 1229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e8c8b18..668c0db 100644
--- a/CHANGES
+++ b/CHANGES
@@ -7,6 +7,7 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-527: Upgrade to Netty 4. (jihun)
 
   BUG FIXES
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index bc89679..fae613a 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.client;
 
 import com.google.protobuf.ServiceException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.*;
@@ -32,6 +33,7 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.jdbc.FetchResultSet;
 import org.apache.tajo.jdbc.TajoMemoryResultSet;
 import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rpc.ServerCallable;
 import org.apache.tajo.util.ProtoUtil;
 
@@ -83,7 +85,6 @@ public class QueryClientImpl implements QueryClient {
 
   @Override
   public void close() {
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index f8762da..bcf6d8b 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -34,7 +34,8 @@ import org.apache.tajo.rpc.ServerCallable;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.ProtoUtil;
-import org.jboss.netty.channel.ConnectTimeoutException;
+
+import io.netty.channel.ConnectTimeoutException;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -84,11 +85,7 @@ public class SessionConnection implements Closeable {
 
     this.properties = properties;
 
-    //TODO separate ConfVars from TajoConf
-    int workerNum = this.properties.getInt("tajo.rpc.client.worker-thread-num", 4);
-
-    // Don't share connection pool per client
-    connPool = RpcConnectionPool.newPool(getClass().getSimpleName(), workerNum);
+    connPool = RpcConnectionPool.getPool();
     userInfo = UserRoleInfo.getCurrentUser();
     this.baseDatabase = baseDatabase != null ? baseDatabase : null;
 
@@ -130,7 +127,7 @@ public class SessionConnection implements Closeable {
     if(!closed.get()){
       try {
         return connPool.getConnection(serviceTracker.getClientServiceAddress(),
-            TajoMasterClientProtocol.class, false).isConnected();
+            TajoMasterClientProtocol.class, false).isActive();
       } catch (Throwable e) {
         return false;
       }
@@ -288,10 +285,6 @@ public class SessionConnection implements Closeable {
 
     } catch (Throwable e) {
     }
-
-    if(connPool != null) {
-      connPool.shutdown();
-    }
   }
 
   protected InetSocketAddress getTajoMasterAddr() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index ce9db73..d3c7ed6 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -388,6 +388,10 @@
       <version>3.1.1</version>
     </dependency>
     <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec-http</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.mortbay.jetty</groupId>
       <artifactId>jetty</artifactId>
       <version>6.1.14</version>

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 586abb0..6f7c5a9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -559,7 +559,7 @@ public class TajoMaster extends CompositeService {
         LOG.info("TajoMaster received SIGINT Signal");
         LOG.info("============================================");
         stop();
-        RpcChannelFactory.shutdown();
+        RpcChannelFactory.shutdownGracefully();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 8cf94eb..813c502 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -42,9 +42,10 @@ import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.Pair;
-import org.jboss.netty.channel.ConnectTimeoutException;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.util.Timer;
+import org.apache.tajo.worker.event.TaskRunnerStartEvent;
+
+import io.netty.channel.ConnectTimeoutException;
+import io.netty.channel.EventLoopGroup;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -67,7 +68,7 @@ public class ExecutionBlockContext {
   public AtomicInteger killedTasksNum = new AtomicInteger();
   public AtomicInteger failedTasksNum = new AtomicInteger();
 
-  private ClientSocketChannelFactory channelFactory;
+  private EventLoopGroup loopGroup;
   // for temporal or intermediate files
   private FileSystem localFS;
   // for input files
@@ -184,12 +185,6 @@ public class ExecutionBlockContext {
     tasks.clear();
 
     resource.release();
-
-    try {
-      releaseShuffleChannelFactory();
-    } catch (Throwable e) {
-      LOG.error(e.getMessage(), e);
-    }
   }
 
   public TajoConf getConf() {
@@ -267,30 +262,10 @@ public class ExecutionBlockContext {
     return histories.get(runner.getId());
   }
 
-  public TajoWorker.WorkerContext getWorkerContext() {
+  public TajoWorker.WorkerContext getWorkerContext(){
     return workerContext;
   }
 
-  protected ClientSocketChannelFactory getShuffleChannelFactory(){
-    if(channelFactory == null) {
-      int workerNum = getConf().getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM);
-      channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", workerNum);
-    }
-    return channelFactory;
-  }
-
-  public Timer getRPCTimer() {
-    return manager.getRPCTimer();
-  }
-
-  protected void releaseShuffleChannelFactory(){
-    if(channelFactory != null) {
-      channelFactory.shutdown();
-      channelFactory.releaseExternalResources();
-      channelFactory = null;
-    }
-  }
-
   private void sendExecutionBlockReport(ExecutionBlockReport reporter) throws Exception {
     getQueryMasterStub().doneExecutionBlock(null, reporter, NullCallback.get());
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index 742a025..fc57a96 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -18,20 +18,33 @@
 
 package org.apache.tajo.worker;
 
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.http.*;
-import org.jboss.netty.handler.timeout.ReadTimeoutException;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.jboss.netty.util.Timer;
+import org.apache.tajo.rpc.RpcChannelFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.DefaultHttpRequest;
+import io.netty.handler.codec.http.HttpClientCodec;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpContentDecompressor;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.ReferenceCountUtil;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -40,8 +53,7 @@ import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.channels.FileChannel;
-
-import static org.jboss.netty.channel.Channels.pipeline;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Fetcher fetches data from a given uri via HTTP protocol and stores them into
@@ -64,17 +76,15 @@ public class Fetcher {
   private long fileLen;
   private int messageReceiveCount;
   private TajoProtos.FetcherState state;
-  private Timer timer;
 
-  private ClientBootstrap bootstrap;
+  private Bootstrap bootstrap;
 
-  public Fetcher(TajoConf conf, URI uri, FileChunk chunk, ClientSocketChannelFactory factory, Timer timer) {
+  public Fetcher(TajoConf conf, URI uri, FileChunk chunk) {
     this.uri = uri;
     this.fileChunk = chunk;
     this.useLocalFile = !chunk.fromRemote();
     this.state = TajoProtos.FetcherState.FETCH_INIT;
     this.conf = conf;
-    this.timer = timer;
 
     String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
     this.host = uri.getHost() == null ? "localhost" : uri.getHost();
@@ -88,13 +98,18 @@ public class Fetcher {
     }
 
     if (!useLocalFile) {
-      bootstrap = new ClientBootstrap(factory);
-      bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec
-      bootstrap.setOption("receiveBufferSize", 1048576); // set 1M
-      bootstrap.setOption("tcpNoDelay", true);
-
-      ChannelPipelineFactory pipelineFactory = new HttpClientPipelineFactory(fileChunk.getFile());
-      bootstrap.setPipelineFactory(pipelineFactory);
+      bootstrap = new Bootstrap()
+        .group(
+            RpcChannelFactory.getSharedClientEventloopGroup(RpcChannelFactory.ClientChannelId.FETCHER,
+                conf.getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
+        .channel(NioSocketChannel.class)
+        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // set 5 sec
+        .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
+        .option(ChannelOption.TCP_NODELAY, true);
+
+      ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile());
+      bootstrap.handler(initializer);
     }
   }
 
@@ -132,30 +147,30 @@ public class Fetcher {
     this.state = TajoProtos.FetcherState.FETCH_FETCHING;
     ChannelFuture future = null;
     try {
-      future = bootstrap.connect(new InetSocketAddress(host, port));
+      future = bootstrap.clone().connect(new InetSocketAddress(host, port))
+              .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
 
       // Wait until the connection attempt succeeds or fails.
-      Channel channel = future.awaitUninterruptibly().getChannel();
+      Channel channel = future.awaitUninterruptibly().channel();
       if (!future.isSuccess()) {
-        future.getChannel().close();
         state = TajoProtos.FetcherState.FETCH_FAILED;
-        throw new IOException(future.getCause());
+        throw new IOException(future.cause());
       }
 
       String query = uri.getPath()
           + (uri.getRawQuery() != null ? "?" + uri.getRawQuery() : "");
       // Prepare the HTTP request.
       HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query);
-      request.setHeader(HttpHeaders.Names.HOST, host);
-      request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
-      request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
+      request.headers().set(HttpHeaders.Names.HOST, host);
+      request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+      request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
 
       LOG.info("Status: " + getState() + ", URI:" + uri);
       // Send the HTTP request.
-      ChannelFuture channelFuture = channel.write(request);
+      ChannelFuture channelFuture = channel.writeAndFlush(request);
 
       // Wait for the server to close the connection.
-      channel.getCloseFuture().awaitUninterruptibly();
+      channel.closeFuture().awaitUninterruptibly();
 
       channelFuture.addListener(ChannelFutureListener.CLOSE);
 
@@ -164,7 +179,7 @@ public class Fetcher {
     } finally {
       if(future != null){
         // Close the channel to exit.
-        future.getChannel().close();
+        future.channel().close();
       }
 
       this.finishTime = System.currentTimeMillis();
@@ -176,8 +191,7 @@ public class Fetcher {
     return this.uri;
   }
 
-  class HttpClientHandler extends SimpleChannelUpstreamHandler {
-    private volatile boolean readingChunks;
+  class HttpClientHandler extends ChannelInboundHandlerAdapter {
     private final File file;
     private RandomAccessFile raf;
     private FileChannel fc;
@@ -185,27 +199,27 @@ public class Fetcher {
 
     public HttpClientHandler(File file) throws FileNotFoundException {
       this.file = file;
+      this.raf = new RandomAccessFile(file, "rw");
+      this.fc = raf.getChannel();
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
         throws Exception {
 
       messageReceiveCount++;
-      try {
-        if (!readingChunks && e.getMessage() instanceof HttpResponse) {
-
-          HttpResponse response = (HttpResponse) e.getMessage();
+      if (msg instanceof HttpResponse) {
+        try {
+          HttpResponse response = (HttpResponse) msg;
 
           StringBuilder sb = new StringBuilder();
           if (LOG.isDebugEnabled()) {
-            sb.append("STATUS: ").append(response.getStatus())
-                .append(", VERSION: ").append(response.getProtocolVersion())
-                .append(", HEADER: ");
+            sb.append("STATUS: ").append(response.getStatus()).append(", VERSION: ")
+                .append(response.getProtocolVersion()).append(", HEADER: ");
           }
-          if (!response.getHeaderNames().isEmpty()) {
-            for (String name : response.getHeaderNames()) {
-              for (String value : response.getHeaders(name)) {
+          if (!response.headers().names().isEmpty()) {
+            for (String name : response.headers().names()) {
+              for (String value : response.headers().getAll(name)) {
                 if (LOG.isDebugEnabled()) {
                   sb.append(name).append(" = ").append(value);
                 }
@@ -219,109 +233,99 @@ public class Fetcher {
             LOG.debug(sb.toString());
           }
 
-          if (response.getStatus().getCode() == HttpResponseStatus.NO_CONTENT.getCode()) {
+          if (response.getStatus().code() == HttpResponseStatus.NO_CONTENT.code()) {
             LOG.warn("There are no data corresponding to the request");
             length = 0;
             return;
-          } else if (response.getStatus().getCode() != HttpResponseStatus.OK.getCode()){
-            LOG.error(response.getStatus().getReasonPhrase());
+          } else if (response.getStatus().code() != HttpResponseStatus.OK.code()) {
+            LOG.error(response.getStatus().reasonPhrase());
             state = TajoProtos.FetcherState.FETCH_FAILED;
             return;
           }
+        } catch (Exception e) {
+          LOG.error(e.getMessage());
+        } finally {
+          ReferenceCountUtil.release(msg);
+        }
+      }
 
-          this.raf = new RandomAccessFile(file, "rw");
-          this.fc = raf.getChannel();
+      if (msg instanceof HttpContent) {
+        try {
+          HttpContent httpContent = (HttpContent) msg;
+          ByteBuf content = httpContent.content();
+          if (content.isReadable()) {
+            content.readBytes(fc, content.readableBytes());
+          }
 
-          if (response.isChunked()) {
-            readingChunks = true;
-          } else {
-            ChannelBuffer content = response.getContent();
-            if (content.readable()) {
-              fc.write(content.toByteBuffer());
+          if (msg instanceof LastHttpContent) {
+            if (raf != null) {
+              fileLen = file.length();
             }
-          }
-        } else {
-          HttpChunk chunk = (HttpChunk) e.getMessage();
-          if (chunk.isLast()) {
-            readingChunks = false;
-            long fileLength = file.length();
-            if (fileLength == length) {
-              LOG.info("Data fetch is done (total received bytes: " + fileLength
-                  + ")");
-            } else {
-              LOG.info("Data fetch is done, but cannot get all data "
-                  + "(received/total: " + fileLength + "/" + length + ")");
+
+            IOUtils.cleanup(LOG, fc, raf);
+            if (ctx.channel().isActive()) {
+              ctx.channel().close();
             }
-          } else {
-            if(fc != null){
-              fc.write(chunk.getContent().toByteBuffer());
+            finishTime = System.currentTimeMillis();
+            if (state != TajoProtos.FetcherState.FETCH_FAILED) {
+              state = TajoProtos.FetcherState.FETCH_FINISHED;
             }
           }
-        }
-      } finally {
-        if(raf != null) {
-          fileLen = file.length();
-        }
-
-        if(fileLen == length){
-          IOUtils.cleanup(LOG, fc, raf);
-          finishTime = System.currentTimeMillis();
-          state = TajoProtos.FetcherState.FETCH_FINISHED;
+        } catch (Exception e) {
+          LOG.error(e.getMessage());
+        } finally {
+          ReferenceCountUtil.release(msg);
         }
       }
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
         throws Exception {
-      if (e.getCause() instanceof ReadTimeoutException) {
-        LOG.warn(e.getCause());
+      if (cause instanceof ReadTimeoutException) {
+        LOG.warn(cause);
       } else {
-        LOG.error("Fetch failed :", e.getCause());
+        LOG.error("Fetch failed :", cause);
       }
 
       // this fetching will be retry
       IOUtils.cleanup(LOG, fc, raf);
-      if(ctx.getChannel().isConnected()){
-        ctx.getChannel().close();
-      }
       finishTime = System.currentTimeMillis();
       state = TajoProtos.FetcherState.FETCH_FAILED;
+      ctx.close();
     }
 
     @Override
-    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-      super.channelDisconnected(ctx, e);
-
+    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
       if(getState() != TajoProtos.FetcherState.FETCH_FINISHED){
         //channel is closed, but cannot complete fetcher
         finishTime = System.currentTimeMillis();
         state = TajoProtos.FetcherState.FETCH_FAILED;
       }
       IOUtils.cleanup(LOG, fc, raf);
+      
+      super.channelUnregistered(ctx);
     }
   }
 
-  class HttpClientPipelineFactory implements
-      ChannelPipelineFactory {
+  class HttpClientChannelInitializer extends ChannelInitializer<Channel> {
     private final File file;
 
-    public HttpClientPipelineFactory(File file) {
+    public HttpClientChannelInitializer(File file) {
       this.file = file;
     }
 
     @Override
-    public ChannelPipeline getPipeline() throws Exception {
-      ChannelPipeline pipeline = pipeline();
+    protected void initChannel(Channel channel) throws Exception {
+      ChannelPipeline pipeline = channel.pipeline();
 
       int maxChunkSize = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE);
       int readTimeout = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT);
 
       pipeline.addLast("codec", new HttpClientCodec(4096, 8192, maxChunkSize));
       pipeline.addLast("inflater", new HttpContentDecompressor());
-      pipeline.addLast("timeout", new ReadTimeoutHandler(timer, readTimeout));
+      pipeline.addLast("timeout", new ReadTimeoutHandler(readTimeout, TimeUnit.SECONDS));
       pipeline.addLast("handler", new HttpClientHandler(file));
-      return pipeline;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 7e2a233..3c55add 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -604,7 +604,7 @@ public class TajoWorker extends CompositeService {
         LOG.info("TajoWorker received SIGINT Signal");
         LOG.info("============================================");
         stop();
-        RpcChannelFactory.shutdown();
+        RpcChannelFactory.shutdownGracefully();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index df3be12..ef94337 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import io.netty.channel.EventLoopGroup;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -55,9 +56,8 @@ import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.NetUtils;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-import org.jboss.netty.util.Timer;
+
+import io.netty.handler.codec.http.QueryStringDecoder;
 
 import java.io.File;
 import java.io.IOException;
@@ -664,8 +664,6 @@ public class Task {
                                         List<FetchImpl> fetches) throws IOException {
 
     if (fetches.size() > 0) {
-      ClientSocketChannelFactory channelFactory = executionBlockContext.getShuffleChannelFactory();
-      Timer timer = executionBlockContext.getRPCTimer();
       Path inputDir = executionBlockContext.getLocalDirAllocator().
           getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
 
@@ -716,7 +714,7 @@ public class Task {
           // If we decide that intermediate data should be really fetched from a remote host, storeChunk
           // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
           storeChunk.setEbId(f.getName());
-          Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk, channelFactory, timer);
+          Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
           LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
           runnerList.add(fetcher);
           i++;
@@ -732,7 +730,7 @@ public class Task {
   private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
     // Parse the URI
     LOG.info("getLocalStoredFileChunk starts");
-    final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).getParameters();
+    final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters();
     final List<String> types = params.get("type");
     final List<String> qids = params.get("qid");
     final List<String> taskIdList = params.get("ta");

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index cf50767..2cdebc8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.worker;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,7 +36,8 @@ import org.apache.tajo.master.container.TajoContainerIdPBImpl;
 import org.apache.tajo.master.container.TajoConverterUtils;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NullCallback;
-import org.jboss.netty.channel.ConnectTimeoutException;
+
+import io.netty.channel.ConnectTimeoutException;
 
 import java.util.concurrent.*;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 570bd38..3f4a1b8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -20,6 +20,7 @@ package org.apache.tajo.worker;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -33,8 +34,6 @@ import org.apache.tajo.engine.utils.TupleCache;
 import org.apache.tajo.worker.event.TaskRunnerEvent;
 import org.apache.tajo.worker.event.TaskRunnerStartEvent;
 import org.apache.tajo.worker.event.TaskRunnerStopEvent;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timer;
 
 import java.io.IOException;
 import java.util.*;
@@ -52,7 +51,6 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
   private AtomicBoolean stop = new AtomicBoolean(false);
   private FinishedTaskCleanThread finishedTaskCleanThread;
   private Dispatcher dispatcher;
-  private HashedWheelTimer rpcTimer;
 
   public TaskRunnerManager(TajoWorker.WorkerContext workerContext, Dispatcher dispatcher) {
     super(TaskRunnerManager.class.getName());
@@ -77,7 +75,6 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
   public void start() {
     finishedTaskCleanThread = new FinishedTaskCleanThread();
     finishedTaskCleanThread.start();
-    rpcTimer = new HashedWheelTimer();
     super.start();
   }
 
@@ -102,10 +99,6 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
       finishedTaskCleanThread.interrupted();
     }
 
-    if(rpcTimer != null){
-      rpcTimer.stop();
-    }
-
     super.stop();
     if(workerContext.isYarnContainerMode()) {
       workerContext.stopWorker(true);
@@ -214,10 +207,6 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
     return tajoConf;
   }
 
-  public Timer getRPCTimer(){
-    return rpcTimer;
-  }
-
   class FinishedTaskCleanThread extends Thread {
     //TODO if history size is large, the historyMap should remove immediately
     public void run() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index 438867e..9910d79 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -31,9 +31,10 @@ import org.apache.tajo.querymaster.Repartitioner;
 import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.FetchImpl;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
 import org.junit.Test;
 
+import io.netty.handler.codec.http.QueryStringDecoder;
+
 import java.net.URI;
 import java.util.*;
 
@@ -89,7 +90,7 @@ public class TestRepartitioner {
 
       URI uri = uris.get(0);
       final Map<String, List<String>> params =
-          new QueryStringDecoder(uri).getParameters();
+          new QueryStringDecoder(uri).parameters();
 
       assertEquals(eachEntry.getKey().toString(), params.get("p").get(0));
       assertEquals("h", params.get("type").get(0));

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index b3654f9..513eb69 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -27,15 +27,9 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.util.CommonTestingUtil;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -50,8 +44,6 @@ public class TestFetcher {
   private String OUTPUT_DIR = TEST_DATA+"/out/";
   private TajoConf conf = new TajoConf();
   private TajoPullServerService pullServerService;
-  private ClientSocketChannelFactory channelFactory;
-  private Timer timer;
 
   @Before
   public void setUp() throws Exception {
@@ -65,16 +57,11 @@ public class TestFetcher {
     pullServerService = new TajoPullServerService();
     pullServerService.init(conf);
     pullServerService.start();
-
-    channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", 1);
-    timer = new HashedWheelTimer();
   }
 
   @After
   public void tearDown(){
     pullServerService.stop();
-    channelFactory.releaseExternalResources();
-    timer.stop();
   }
 
   @Test
@@ -102,7 +89,7 @@ public class TestFetcher {
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
     FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
     storeChunk.setFromRemote(true);
-    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
     FileChunk chunk = fetcher.get();
     assertNotNull(chunk);
     assertNotNull(chunk.getFile());
@@ -148,7 +135,7 @@ public class TestFetcher {
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
     FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
     storeChunk.setFromRemote(true);
-    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
     assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
 
     fetcher.get();
@@ -178,7 +165,7 @@ public class TestFetcher {
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
     FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
     storeChunk.setFromRemote(true);
-    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
     assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
 
     fetcher.get();
@@ -212,7 +199,7 @@ public class TestFetcher {
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
     FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
     storeChunk.setFromRemote(true);
-    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
     assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
 
     fetcher.get();
@@ -232,7 +219,7 @@ public class TestFetcher {
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
     FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
     storeChunk.setFromRemote(true);
-    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
+    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
     assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
 
     pullServerService.stop();

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index 30f864c..3820d50 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -37,6 +37,7 @@
     <protobuf.version>2.5.0</protobuf.version>
     <tajo.version>0.10.0-SNAPSHOT</tajo.version>
     <hbase.version>0.98.7-hadoop2</hbase.version>
+    <netty.version>4.0.25.Final</netty.version>
     <tajo.root>${project.parent.relativePath}/..</tajo.root>
     <extra.source.path>src/main/hadoop-${hadoop.version}</extra.source.path>
   </properties>
@@ -1024,13 +1025,28 @@
       </dependency>
       <dependency>
         <groupId>io.netty</groupId>
-        <artifactId>netty</artifactId>
-        <version>3.6.6.Final</version>
+        <artifactId>netty-buffer</artifactId>
+        <version>${netty.version}</version>
       </dependency>
       <dependency>
         <groupId>io.netty</groupId>
-        <artifactId>netty-buffer</artifactId>
-        <version>4.0.24.Final</version>
+        <artifactId>netty-transport</artifactId>
+        <version>${netty.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.netty</groupId>
+        <artifactId>netty-codec</artifactId>
+        <version>${netty.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.netty</groupId>
+        <artifactId>netty-codec-http</artifactId>
+        <version>${netty.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.netty</groupId>
+        <artifactId>netty-handler</artifactId>
+        <version>${netty.version}</version>
       </dependency>
       <dependency>
         <groupId>org.apache.derby</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-pullserver/pom.xml b/tajo-pullserver/pom.xml
index 6d13a3c..cdbda3e 100644
--- a/tajo-pullserver/pom.xml
+++ b/tajo-pullserver/pom.xml
@@ -47,6 +47,14 @@
 
   <dependencies>
     <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-transport</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec-http</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.tajo</groupId>
       <artifactId>tajo-rpc</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
index b0b8d18..3df82e6 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
@@ -22,7 +22,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.io.nativeio.NativeIO;
-import org.jboss.netty.handler.stream.ChunkedFile;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedFile;
 
 import java.io.FileDescriptor;
 import java.io.IOException;
@@ -52,13 +55,13 @@ public class FadvisedChunkedFile extends ChunkedFile {
   }
 
   @Override
-  public Object nextChunk() throws Exception {
+  public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
     if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
       readaheadRequest = readaheadPool
-          .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
-              getEndOffset(), readaheadRequest);
+          .readaheadStream(identifier, fd, currentOffset(), readaheadLength,
+              endOffset(), readaheadRequest);
     }
-    return super.nextChunk();
+    return super.readChunk(ctx);
   }
 
   @Override
@@ -66,11 +69,11 @@ public class FadvisedChunkedFile extends ChunkedFile {
     if (readaheadRequest != null) {
       readaheadRequest.cancel();
     }
-    if (PullServerUtil.isNativeIOPossible() && manageOsCache && getEndOffset() - getStartOffset() > 0) {
+    if (PullServerUtil.isNativeIOPossible() && manageOsCache && endOffset() - startOffset() > 0) {
       try {
         PullServerUtil.posixFadviseIfPossible(identifier,
             fd,
-            getStartOffset(), getEndOffset() - getStartOffset(),
+            startOffset(), endOffset() - startOffset(),
             NativeIO.POSIX.POSIX_FADV_DONTNEED);
       } catch (Throwable t) {
         LOG.warn("Failed to manage OS cache for " + identifier, t);

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
index 18cf4b6..643d9e0 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
@@ -19,11 +19,13 @@
 package org.apache.tajo.pullserver;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.io.nativeio.NativeIO;
-import org.jboss.netty.channel.DefaultFileRegion;
+
+import io.netty.channel.DefaultFileRegion;
 
 import java.io.FileDescriptor;
 import java.io.IOException;
@@ -79,8 +81,8 @@ public class FadvisedFileRegion extends DefaultFileRegion {
       throws IOException {
     if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
       readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
-          getPosition() + position, readaheadLength,
-          getPosition() + getCount(), readaheadRequest);
+          position() + position, readaheadLength,
+          position() + count(), readaheadRequest);
     }
 
     if(this.shuffleTransferToAllowed) {
@@ -146,11 +148,11 @@ public class FadvisedFileRegion extends DefaultFileRegion {
 
 
   @Override
-  public void releaseExternalResources() {
+  protected void deallocate() {
     if (readaheadRequest != null) {
       readaheadRequest.cancel();
     }
-    super.releaseExternalResources();
+    super.deallocate();
   }
 
   /**
@@ -158,9 +160,9 @@ public class FadvisedFileRegion extends DefaultFileRegion {
    * we don't need the region to be cached anymore.
    */
   public void transferSuccessful() {
-    if (PullServerUtil.isNativeIOPossible() && manageOsCache && getCount() > 0) {
+    if (PullServerUtil.isNativeIOPossible() && manageOsCache && count() > 0 && super.isOpen()) {
       try {
-        PullServerUtil.posixFadviseIfPossible(identifier, fd, getPosition(), getCount(),
+        PullServerUtil.posixFadviseIfPossible(identifier, fd, position(), count(),
             NativeIO.POSIX.POSIX_FADV_DONTNEED);
       } catch (Throwable t) {
         LOG.warn("Failed to manage OS cache for " + identifier, t);

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
index 236db89..9c3c523 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
@@ -18,10 +18,10 @@
 
 package org.apache.tajo.pullserver;
 
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
 
-public class FileCloseListener implements ChannelFutureListener {
+public class FileCloseListener implements GenericFutureListener<ChannelFuture> {
 
   private FadvisedFileRegion filePart;
   private String requestUri;
@@ -45,7 +45,7 @@ public class FileCloseListener implements ChannelFutureListener {
     if(future.isSuccess()){
       filePart.transferSuccessful();
     }
-    filePart.releaseExternalResources();
+    filePart.deallocate();
     if (pullServerService != null) {
       pullServerService.completeFileChunk(filePart, requestUri, startTime);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerChannelInitializer.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerChannelInitializer.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerChannelInitializer.java
new file mode 100644
index 0000000..8661ee5
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerChannelInitializer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import java.util.concurrent.TimeUnit;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.http.HttpContentCompressor;
+import io.netty.handler.codec.http.HttpRequestDecoder;
+import io.netty.handler.codec.http.HttpResponseEncoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+
+public class HttpDataServerChannelInitializer extends ChannelInitializer<Channel> {
+  private String userName;
+  private String appId;
+  public HttpDataServerChannelInitializer(String userName, String appId) {
+    this.userName = userName;
+    this.appId = appId;
+  }
+
+  @Override
+  protected void initChannel(Channel channel) throws Exception {
+ // Create a default pipeline implementation.
+    ChannelPipeline pipeline = channel.pipeline();
+
+    // Uncomment the following line if you want HTTPS
+    // SSLEngine engine =
+    // SecureChatSslContextFactory.getServerContext().createSSLEngine();
+    // engine.setUseClientMode(false);
+    // pipeline.addLast("ssl", new SslHandler(engine));
+
+    pipeline.addLast("decoder", new HttpRequestDecoder());
+    //pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
+    pipeline.addLast("encoder", new HttpResponseEncoder());
+    pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
+    pipeline.addLast("deflater", new HttpContentCompressor());
+    pipeline.addLast("handler", new HttpDataServerHandler(userName, appId));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
index bfb70b4..472b967 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
@@ -19,19 +19,21 @@
 package org.apache.tajo.pullserver;
 
 import com.google.common.collect.Lists;
+
+import io.netty.channel.*;
+import io.netty.handler.codec.http.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.pullserver.retriever.DataRetriever;
 import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.*;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedFile;
-import org.jboss.netty.util.CharsetUtil;
+
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedFile;
+import io.netty.util.CharsetUtil;
 
 import java.io.*;
 import java.net.URLDecoder;
@@ -41,14 +43,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
+public class HttpDataServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
   private final static Log LOG = LogFactory.getLog(HttpDataServerHandler.class);
 
   Map<ExecutionBlockId, DataRetriever> retrievers =
@@ -62,21 +57,18 @@ public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
   }
 
   @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+  public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
       throws Exception {
-    HttpRequest request = (HttpRequest) e.getMessage();
-    if (request.getMethod() != GET) {
-      sendError(ctx, METHOD_NOT_ALLOWED);
+
+    if (request.getMethod() != HttpMethod.GET) {
+      sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
       return;
     }
 
-    String base =
-        ContainerLocalizer.USERCACHE + "/" + userName + "/"
-            + ContainerLocalizer.APPCACHE + "/"
-            + appId + "/output" + "/";
+    String base = ContainerLocalizer.USERCACHE + "/" + userName + "/" + ContainerLocalizer.APPCACHE + "/" + appId
+        + "/output" + "/";
 
-    final Map<String, List<String>> params =
-        new QueryStringDecoder(request.getUri()).getParameters();
+    final Map<String, List<String>> params = new QueryStringDecoder(request.getUri()).parameters();
 
     List<FileChunk> chunks = Lists.newArrayList();
     List<String> taskIds = splitMaps(params.get("ta"));
@@ -90,65 +82,54 @@ public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
     }
 
     FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
-//    try {
-//      file = retriever.handle(ctx, request);
-//    } catch (FileNotFoundException fnf) {
-//      LOG.error(fnf);
-//      sendError(ctx, NOT_FOUND);
-//      return;
-//    } catch (IllegalArgumentException iae) {
-//      LOG.error(iae);
-//      sendError(ctx, BAD_REQUEST);
-//      return;
-//    } catch (FileAccessForbiddenException fafe) {
-//      LOG.error(fafe);
-//      sendError(ctx, FORBIDDEN);
-//      return;
-//    } catch (IOException ioe) {
-//      LOG.error(ioe);
-//      sendError(ctx, INTERNAL_SERVER_ERROR);
-//      return;
-//    }
 
     // Write the content.
-    Channel ch = e.getChannel();
     if (file == null) {
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
-      ch.write(response);
-      if (!isKeepAlive(request)) {
-        ch.close();
+      HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+      if (!HttpHeaders.isKeepAlive(request)) {
+        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+      } else {
+        response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+        ctx.writeAndFlush(response);
       }
-    }  else {
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+    } else {
+      HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+      ChannelFuture writeFuture = null;
       long totalSize = 0;
       for (FileChunk chunk : file) {
         totalSize += chunk.length();
       }
-      setContentLength(response, totalSize);
+      HttpHeaders.setContentLength(response, totalSize);
 
+      if (HttpHeaders.isKeepAlive(request)) {
+        response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+      }
       // Write the initial line and the header.
-      ch.write(response);
-
-      ChannelFuture writeFuture = null;
+      writeFuture = ctx.write(response);
 
       for (FileChunk chunk : file) {
-        writeFuture = sendFile(ctx, ch, chunk);
+        writeFuture = sendFile(ctx, chunk);
         if (writeFuture == null) {
-          sendError(ctx, NOT_FOUND);
+          sendError(ctx, HttpResponseStatus.NOT_FOUND);
           return;
         }
       }
+      if (ctx.pipeline().get(SslHandler.class) == null) {
+        writeFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+      } else {
+        ctx.flush();
+      }
 
       // Decide whether to close the connection or not.
-      if (!isKeepAlive(request)) {
+      if (!HttpHeaders.isKeepAlive(request)) {
         // Close the connection when the whole content is written out.
         writeFuture.addListener(ChannelFutureListener.CLOSE);
       }
     }
+
   }
 
   private ChannelFuture sendFile(ChannelHandlerContext ctx,
-                                 Channel ch,
                                  FileChunk file) throws IOException {
     RandomAccessFile raf;
     try {
@@ -158,38 +139,41 @@ public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
     }
 
     ChannelFuture writeFuture;
-    if (ch.getPipeline().get(SslHandler.class) != null) {
+    ChannelFuture lastContentFuture;
+    if (ctx.pipeline().get(SslHandler.class) != null) {
       // Cannot use zero-copy with HTTPS.
-      writeFuture = ch.write(new ChunkedFile(raf, file.startOffset(),
-          file.length(), 8192));
+      lastContentFuture = ctx.write(new HttpChunkedInput(new ChunkedFile(raf, file.startOffset(),
+          file.length(), 8192)));
     } else {
       // No encryption - use zero-copy.
       final FileRegion region = new DefaultFileRegion(raf.getChannel(),
           file.startOffset(), file.length());
-      writeFuture = ch.write(region);
+      writeFuture = ctx.write(region);
+      lastContentFuture = ctx.write(LastHttpContent.EMPTY_LAST_CONTENT);
       writeFuture.addListener(new ChannelFutureListener() {
         public void operationComplete(ChannelFuture future) {
-          region.releaseExternalResources();
+          if (region.refCnt() > 0) {
+            region.release();
+          }
         }
       });
     }
 
-    return writeFuture;
+    return lastContentFuture;
   }
 
   @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
       throws Exception {
-    Channel ch = e.getChannel();
-    Throwable cause = e.getCause();
+    Channel ch = ctx.channel();
     if (cause instanceof TooLongFrameException) {
-      sendError(ctx, BAD_REQUEST);
+      sendError(ctx, HttpResponseStatus.BAD_REQUEST);
       return;
     }
 
-    cause.printStackTrace();
-    if (ch.isConnected()) {
-      sendError(ctx, INTERNAL_SERVER_ERROR);
+    LOG.error(cause.getMessage(), cause);
+    if (ch.isActive()) {
+      sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
     }
   }
 
@@ -221,13 +205,12 @@ public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
   }
 
   private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
-    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
-    response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
-    response.setContent(ChannelBuffers.copiedBuffer(
-        "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
+    FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
+        Unpooled.copiedBuffer("Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
+    response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
 
     // Close the connection as soon as the error message is sent.
-    ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+    ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
   }
 
   private List<String> splitMaps(List<String> qids) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java
deleted file mode 100644
index 4c8bd8b..0000000
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.handler.codec.http.HttpContentCompressor;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-
-import static org.jboss.netty.channel.Channels.pipeline;
-
-public class HttpDataServerPipelineFactory implements ChannelPipelineFactory {
-  private String userName;
-  private String appId;
-  public HttpDataServerPipelineFactory(String userName, String appId) {
-    this.userName = userName;
-    this.appId = appId;
-  }
-
-  public ChannelPipeline getPipeline() throws Exception {
-    // Create a default pipeline implementation.
-    ChannelPipeline pipeline = pipeline();
-
-    // Uncomment the following line if you want HTTPS
-    // SSLEngine engine =
-    // SecureChatSslContextFactory.getServerContext().createSSLEngine();
-    // engine.setUseClientMode(false);
-    // pipeline.addLast("ssl", new SslHandler(engine));
-
-    pipeline.addLast("decoder", new HttpRequestDecoder());
-    //pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
-    pipeline.addLast("encoder", new HttpResponseEncoder());
-    pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
-    pipeline.addLast("deflater", new HttpContentCompressor());
-    pipeline.addLast("handler", new HttpDataServerHandler(userName, appId));
-    return pipeline;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
index d633058..ce4018b 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
@@ -19,7 +19,22 @@
 package org.apache.tajo.pullserver;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,23 +63,13 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.apache.tajo.util.TajoIdUtils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.*;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.jboss.netty.util.CharsetUtil;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -78,16 +83,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 
 public class PullServerAuxService extends AuxiliaryService {
 
@@ -100,9 +95,9 @@ public class PullServerAuxService extends AuxiliaryService {
   public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
 
   private int port;
-  private ChannelFactory selector;
-  private final ChannelGroup accepted = new DefaultChannelGroup();
-  private HttpPipelineFactory pipelineFact;
+  private ServerBootstrap selector;
+  private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+  private HttpChannelInitializer initializer;
   private int sslFileBufferSize;
 
   private ApplicationId appId;
@@ -130,7 +125,7 @@ public class PullServerAuxService extends AuxiliaryService {
   public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
 
   @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
-  static class ShuffleMetrics implements ChannelFutureListener {
+  static class ShuffleMetrics implements GenericFutureListener<ChannelFuture> {
     @Metric({"OutputBytes","PullServer output in bytes"})
     MutableCounterLong shuffleOutputBytes;
     @Metric({"Failed","# of failed shuffle outputs"})
@@ -211,16 +206,10 @@ public class PullServerAuxService extends AuxiliaryService {
       readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
           DEFAULT_SHUFFLE_READAHEAD_BYTES);
 
-      ThreadFactory bossFactory = new ThreadFactoryBuilder()
-          .setNameFormat("PullServerAuxService Netty Boss #%d")
-          .build();
-      ThreadFactory workerFactory = new ThreadFactoryBuilder()
-          .setNameFormat("PullServerAuxService Netty Worker #%d")
-          .build();
-
-      selector = new NioServerSocketChannelFactory(
-          Executors.newCachedThreadPool(bossFactory),
-          Executors.newCachedThreadPool(workerFactory));
+      selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", 0)
+                  .option(ChannelOption.TCP_NODELAY, true)
+                  .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+                  .childOption(ChannelOption.TCP_NODELAY, true);
 
       localFS = new LocalFileSystem();
       super.init(new Configuration(conf));
@@ -233,20 +222,23 @@ public class PullServerAuxService extends AuxiliaryService {
   @Override
   public synchronized void start() {
     Configuration conf = getConfig();
-    ServerBootstrap bootstrap = new ServerBootstrap(selector);
+    ServerBootstrap bootstrap = selector.clone();
     try {
-      pipelineFact = new HttpPipelineFactory(conf);
+      initializer = new HttpChannelInitializer(conf);
     } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
-    bootstrap.setPipelineFactory(pipelineFact);
+    bootstrap.channel(NioServerSocketChannel.class)
+            .handler(initializer);
     port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
         ConfVars.PULLSERVER_PORT.defaultIntVal);
-    Channel ch = bootstrap.bind(new InetSocketAddress(port));
-    accepted.add(ch);
-    port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+    ChannelFuture future = bootstrap.bind(new InetSocketAddress(port))
+            .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE)
+            .syncUninterruptibly();
+    accepted.add(future.channel());
+    port = ((InetSocketAddress)future.channel().localAddress()).getPort();
     conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
-    pipelineFact.PullServer.setPort(port);
+    initializer.PullServer.setPort(port);
     LOG.info(getName() + " listening on port " + port);
     super.start();
 
@@ -261,10 +253,19 @@ public class PullServerAuxService extends AuxiliaryService {
   @Override
   public synchronized void stop() {
     try {
-      accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
-      ServerBootstrap bootstrap = new ServerBootstrap(selector);
-      bootstrap.releaseExternalResources();
-      pipelineFact.destroy();
+      accepted.close();
+      if (selector != null) {
+        if (selector.group() != null) {
+          selector.group().shutdownGracefully();
+        }
+        if (selector.childGroup() != null) {
+          selector.childGroup().shutdownGracefully();
+        }
+      }
+
+      if (initializer != null) {
+        initializer.destroy();
+      }
 
       localFS.close();
     } catch (Throwable t) {
@@ -285,12 +286,12 @@ public class PullServerAuxService extends AuxiliaryService {
     }
   }
 
-  class HttpPipelineFactory implements ChannelPipelineFactory {
+  class HttpChannelInitializer extends ChannelInitializer<Channel> {
 
     final PullServer PullServer;
     private SSLFactory sslFactory;
 
-    public HttpPipelineFactory(Configuration conf) throws Exception {
+    public HttpChannelInitializer(Configuration conf) throws Exception {
       PullServer = new PullServer(conf);
       if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname,
           ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
@@ -306,24 +307,25 @@ public class PullServerAuxService extends AuxiliaryService {
     }
 
     @Override
-    public ChannelPipeline getPipeline() throws Exception {
-      ChannelPipeline pipeline = Channels.pipeline();
+    protected void initChannel(Channel channel) throws Exception {
+      ChannelPipeline pipeline = channel.pipeline();
       if (sslFactory != null) {
         pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
       }
-      pipeline.addLast("decoder", new HttpRequestDecoder());
-      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
+
       pipeline.addLast("encoder", new HttpResponseEncoder());
+      pipeline.addLast("decoder", new HttpRequestDecoder());
+      pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16));
       pipeline.addLast("chunking", new ChunkedWriteHandler());
       pipeline.addLast("shuffle", PullServer);
-      return pipeline;
       // TODO factor security manager into pipeline
       // TODO factor out encode/decode to permit binary shuffle
       // TODO factor out decode of index to permit alt. models
     }
   }
 
-  class PullServer extends SimpleChannelUpstreamHandler {
+  @ChannelHandler.Sharable
+  class PullServer extends SimpleChannelInboundHandler<FullHttpRequest> {
     private final Configuration conf;
     private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
     private int port;
@@ -349,33 +351,27 @@ public class PullServerAuxService extends AuxiliaryService {
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
         throws Exception {
-
-      HttpRequest request = (HttpRequest) e.getMessage();
-      if (request.getMethod() != GET) {
-        sendError(ctx, METHOD_NOT_ALLOWED);
+      if (request.getMethod() != HttpMethod.GET) {
+        sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
         return;
       }
 
       // Parsing the URL into key-values
-      final Map<String, List<String>> params =
-          new QueryStringDecoder(request.getUri()).getParameters();
+      final Map<String, List<String>> params = new QueryStringDecoder(request.getUri()).parameters();
       final List<String> types = params.get("type");
       final List<String> taskIdList = params.get("ta");
       final List<String> stageIds = params.get("sid");
       final List<String> partitionIds = params.get("p");
 
-      if (types == null || taskIdList == null || stageIds == null
-          || partitionIds == null) {
-        sendError(ctx, "Required type, taskIds, stage Id, and partition id",
-            BAD_REQUEST);
+      if (types == null || taskIdList == null || stageIds == null || partitionIds == null) {
+        sendError(ctx, "Required type, taskIds, stage Id, and partition id", HttpResponseStatus.BAD_REQUEST);
         return;
       }
 
       if (types.size() != 1 || stageIds.size() != 1) {
-        sendError(ctx, "Required type, taskIds, stage Id, and partition id",
-            BAD_REQUEST);
+        sendError(ctx, "Required type, taskIds, stage Id, and partition id", HttpResponseStatus.BAD_REQUEST);
         return;
       }
 
@@ -389,12 +385,11 @@ public class PullServerAuxService extends AuxiliaryService {
       // the working dir of tajo worker for each query
       String queryBaseDir = queryId + "/output" + "/";
 
-      LOG.info("PullServer request param: repartitionType=" + repartitionType +
-          ", sid=" + sid + ", partitionId=" + partitionId + ", taskIds=" + taskIdList);
+      LOG.info("PullServer request param: repartitionType=" + repartitionType + ", sid=" + sid + ", partitionId="
+          + partitionId + ", taskIds=" + taskIdList);
 
       String taskLocalDir = conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname);
-      if (taskLocalDir == null ||
-          taskLocalDir.equals("")) {
+      if (taskLocalDir == null || taskLocalDir.equals("")) {
         LOG.error("Tajo local directory should be specified.");
       }
       LOG.info("PullServer baseDir: " + taskLocalDir + "/" + queryBaseDir);
@@ -402,9 +397,8 @@ public class PullServerAuxService extends AuxiliaryService {
       // if a stage requires a range partitioning
       if (repartitionType.equals("r")) {
         String ta = taskIds.get(0);
-        Path path = localFS.makeQualified(
-            lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/"
-                + ta + "/output/", conf));
+        Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta
+            + "/output/", conf));
 
         String startKey = params.get("start").get(0);
         String endKey = params.get("end").get(0);
@@ -415,19 +409,19 @@ public class PullServerAuxService extends AuxiliaryService {
           chunk = getFileCunks(path, startKey, endKey, last);
         } catch (Throwable t) {
           LOG.error("ERROR Request: " + request.getUri(), t);
-          sendError(ctx, "Cannot get file chunks to be sent", BAD_REQUEST);
+          sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST);
           return;
         }
         if (chunk != null) {
           chunks.add(chunk);
         }
 
-        // if a stage requires a hash repartition  or a scattered hash repartition
+        // if a stage requires a hash repartition or a scattered hash
+        // repartition
       } else if (repartitionType.equals("h") || repartitionType.equals("s")) {
         for (String ta : taskIds) {
-          Path path = localFS.makeQualified(
-              lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" +
-                  ta + "/output/" + partitionId, conf));
+          Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta
+              + "/output/" + partitionId, conf));
           File file = new File(path.toUri());
           FileChunk chunk = new FileChunk(file, 0, file.length());
           chunks.add(chunk);
@@ -438,45 +432,54 @@ public class PullServerAuxService extends AuxiliaryService {
       }
 
       // Write the content.
-      Channel ch = e.getChannel();
       if (chunks.size() == 0) {
-        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
-        ch.write(response);
-        if (!isKeepAlive(request)) {
-          ch.close();
+        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+
+        if (!HttpHeaders.isKeepAlive(request)) {
+          ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+        } else {
+          response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+          ctx.writeAndFlush(response);
         }
-      }  else {
+      } else {
         FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
-        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+        ChannelFuture writeFuture = null;
+        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
         long totalSize = 0;
         for (FileChunk chunk : file) {
           totalSize += chunk.length();
         }
-        setContentLength(response, totalSize);
+        HttpHeaders.setContentLength(response, totalSize);
 
+        if (HttpHeaders.isKeepAlive(request)) {
+          response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+        }
         // Write the initial line and the header.
-        ch.write(response);
-
-        ChannelFuture writeFuture = null;
+        writeFuture = ctx.write(response);
 
         for (FileChunk chunk : file) {
-          writeFuture = sendFile(ctx, ch, chunk);
+          writeFuture = sendFile(ctx, chunk);
           if (writeFuture == null) {
-            sendError(ctx, NOT_FOUND);
+            sendError(ctx, HttpResponseStatus.NOT_FOUND);
             return;
           }
         }
+        if (ctx.pipeline().get(SslHandler.class) == null) {
+          writeFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+        } else {
+          ctx.flush();
+        }
 
         // Decide whether to close the connection or not.
-        if (!isKeepAlive(request)) {
+        if (!HttpHeaders.isKeepAlive(request)) {
           // Close the connection when the whole content is written out.
           writeFuture.addListener(ChannelFutureListener.CLOSE);
         }
       }
+
     }
 
     private ChannelFuture sendFile(ChannelHandlerContext ctx,
-                                   Channel ch,
                                    FileChunk file) throws IOException {
       RandomAccessFile spill;
       try {
@@ -485,26 +488,27 @@ public class PullServerAuxService extends AuxiliaryService {
         LOG.info(file.getFile() + " not found");
         return null;
       }
-      ChannelFuture writeFuture;
-      if (ch.getPipeline().get(SslHandler.class) == null) {
+
+      ChannelFuture lastContentFuture;
+      if (ctx.pipeline().get(SslHandler.class) == null) {
         final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
             file.startOffset(), file.length(), manageOsCache, readaheadLength,
             readaheadPool, file.getFile().getAbsolutePath());
-        writeFuture = ch.write(partition);
-        writeFuture.addListener(new FileCloseListener(partition, null, 0, null));
+        lastContentFuture = ctx.write(partition);
+        lastContentFuture.addListener(new FileCloseListener(partition, null, 0, null));
       } else {
         // HTTPS cannot be done with zero copy.
         final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
             file.startOffset(), file.length(), sslFileBufferSize,
             manageOsCache, readaheadLength, readaheadPool,
             file.getFile().getAbsolutePath());
-        writeFuture = ch.write(chunk);
+        lastContentFuture = ctx.write(new HttpChunkedInput(chunk));
       }
       metrics.shuffleConnections.incr();
       metrics.shuffleOutputBytes.incr(file.length()); // optimistic
-      return writeFuture;
+      return lastContentFuture;
     }
-
+    
     private void sendError(ChannelHandlerContext ctx,
         HttpResponseStatus status) {
       sendError(ctx, "", status);
@@ -512,29 +516,26 @@ public class PullServerAuxService extends AuxiliaryService {
 
     private void sendError(ChannelHandlerContext ctx, String message,
         HttpResponseStatus status) {
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
-      response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
-      response.setContent(
-        ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+      FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
+              Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
+      response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
 
       // Close the connection as soon as the error message is sent.
-      ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+      ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
         throws Exception {
-      Channel ch = e.getChannel();
-      Throwable cause = e.getCause();
+      Channel ch = ctx.channel();
       if (cause instanceof TooLongFrameException) {
-        sendError(ctx, BAD_REQUEST);
+        sendError(ctx, HttpResponseStatus.BAD_REQUEST);
         return;
       }
 
       LOG.error("PullServer error: ", cause);
-      if (ch.isConnected()) {
-        LOG.error("PullServer error " + e);
-        sendError(ctx, INTERNAL_SERVER_ERROR);
+      if (ch.isActive()) {
+        sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
       }
     }
   }


Mime
View raw message