tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [06/10] tajo git commit: TAJO-527: Upgrade to Netty 4
Date Thu, 05 Mar 2015 06:15:58 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
index 0727f71..ed6b634 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
@@ -19,73 +19,125 @@
 package org.apache.tajo.rpc;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.*;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.ThreadNameDeterminer;
 
-import java.util.concurrent.Executors;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public final class RpcChannelFactory {
   private static final Log LOG = LogFactory.getLog(RpcChannelFactory.class);
-
+  
   private static final int DEFAULT_WORKER_NUM = Runtime.getRuntime().availableProcessors() * 2;
 
-  private static ClientSocketChannelFactory factory;
-  private static AtomicInteger clientCount = new AtomicInteger(0);
+  private static final Object lockObjectForLoopGroup = new Object();
   private static AtomicInteger serverCount = new AtomicInteger(0);
 
+  public enum ClientChannelId {
+    CLIENT_DEFAULT,
+    FETCHER
+  }
+
+  private static final Map<ClientChannelId, Integer> defaultMaxKeyPoolCount =
+      new ConcurrentHashMap<ClientChannelId, Integer>();
+  private static final Map<ClientChannelId, Queue<EventLoopGroup>> eventLoopGroupPool =
+      new ConcurrentHashMap<ClientChannelId, Queue<EventLoopGroup>>();
+
   private RpcChannelFactory(){
   }
+  
+  static {
+    Runtime.getRuntime().addShutdownHook(new CleanUpHandler());
+
+    defaultMaxKeyPoolCount.put(ClientChannelId.CLIENT_DEFAULT, 1);
+    defaultMaxKeyPoolCount.put(ClientChannelId.FETCHER, 1);
+  }
 
   /**
-   * make this factory static thus all clients can share its thread pool.
-   * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
-   */
-  public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory() {
-    return getSharedClientChannelFactory(DEFAULT_WORKER_NUM);
+  * make this factory static thus all clients can share its thread pool.
+  * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
+  */
+  public static EventLoopGroup getSharedClientEventloopGroup() {
+    return getSharedClientEventloopGroup(DEFAULT_WORKER_NUM);
+  }
+  
+  /**
+  * make this factory static thus all clients can share its thread pool.
+  * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
+  *
+  * @param workerNum The number of workers
+  */
+  public static EventLoopGroup getSharedClientEventloopGroup(int workerNum){
+    //shared woker and boss pool
+    return getSharedClientEventloopGroup(ClientChannelId.CLIENT_DEFAULT, workerNum);
   }
 
   /**
-   * make this factory static thus all clients can share its thread pool.
-   * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
+   * This function return eventloopgroup by key. Fetcher client will have one or more eventloopgroup for its throughput.
    *
-   * @param workerNum The number of workers
+   * @param clientId
+   * @param workerNum
+   * @return
    */
-  public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory(int workerNum){
-    //shared woker and boss pool
-    if(factory == null){
-      factory = createClientChannelFactory("Internal-Client", workerNum);
+  public static EventLoopGroup getSharedClientEventloopGroup(ClientChannelId clientId, int workerNum) {
+    Queue<EventLoopGroup> eventLoopGroupQueue;
+    EventLoopGroup returnEventLoopGroup;
+
+    synchronized (lockObjectForLoopGroup) {
+      eventLoopGroupQueue = eventLoopGroupPool.get(clientId);
+      if (eventLoopGroupQueue == null) {
+        eventLoopGroupQueue = createClientEventloopGroups(clientId, workerNum);
+      }
+
+      returnEventLoopGroup = eventLoopGroupQueue.poll();
+      if (isEventLoopGroupShuttingDown(returnEventLoopGroup)) {
+        returnEventLoopGroup = createClientEventloopGroup(clientId.name(), workerNum);
+      }
+      eventLoopGroupQueue.add(returnEventLoopGroup);
     }
-    return factory;
+
+    return returnEventLoopGroup;
+  }
+
+  protected static boolean isEventLoopGroupShuttingDown(EventLoopGroup eventLoopGroup) {
+    return ((eventLoopGroup == null) || eventLoopGroup.isShuttingDown());
   }
 
   // Client must release the external resources
-  public static synchronized ClientSocketChannelFactory createClientChannelFactory(String name, int workerNum) {
-    name = name + "-" + clientCount.incrementAndGet();
-    if(LOG.isDebugEnabled()){
-      LOG.debug("Create " + name + " ClientSocketChannelFactory. Worker:" + workerNum);
+  protected static Queue<EventLoopGroup> createClientEventloopGroups(ClientChannelId clientId, int workerNum) {
+    int defaultMaxObjectCount = defaultMaxKeyPoolCount.get(clientId);
+    Queue<EventLoopGroup> loopGroupQueue = new ConcurrentLinkedQueue<EventLoopGroup>();
+    eventLoopGroupPool.put(clientId, loopGroupQueue);
+
+    for (int objectIdx = 0; objectIdx < defaultMaxObjectCount; objectIdx++) {
+      loopGroupQueue.add(createClientEventloopGroup(clientId.name(), workerNum));
     }
 
-    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
-    ThreadFactory bossFactory = builder.setNameFormat(name + " Boss #%d").build();
-    ThreadFactory workerFactory = builder.setNameFormat(name + " Worker #%d").build();
+    return loopGroupQueue;
+  }
 
-    NioClientBossPool bossPool = new NioClientBossPool(Executors.newCachedThreadPool(bossFactory), 1,
-        new HashedWheelTimer(), ThreadNameDeterminer.CURRENT);
-    NioWorkerPool workerPool = new NioWorkerPool(Executors.newCachedThreadPool(workerFactory), workerNum,
-        ThreadNameDeterminer.CURRENT);
+  protected static EventLoopGroup createClientEventloopGroup(String name, int workerNum) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Create " + name + " ClientEventLoopGroup. Worker:" + workerNum);
+    }
+
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    ThreadFactory clientFactory = builder.setNameFormat(name + " Client #%d").build();
 
-    return new NioClientSocketChannelFactory(bossPool, workerPool);
+    return new NioEventLoopGroup(workerNum, clientFactory);
   }
 
   // Client must release the external resources
-  public static synchronized ServerSocketChannelFactory createServerChannelFactory(String name, int workerNum) {
+  public static ServerBootstrap createServerChannelFactory(String name, int workerNum) {
     name = name + "-" + serverCount.incrementAndGet();
     if(LOG.isInfoEnabled()){
       LOG.info("Create " + name + " ServerSocketChannelFactory. Worker:" + workerNum);
@@ -93,22 +145,38 @@ public final class RpcChannelFactory {
     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
     ThreadFactory bossFactory = builder.setNameFormat(name + " Server Boss #%d").build();
     ThreadFactory workerFactory = builder.setNameFormat(name + " Server Worker #%d").build();
-
-    NioServerBossPool bossPool =
-        new NioServerBossPool(Executors.newCachedThreadPool(bossFactory), 1, ThreadNameDeterminer.CURRENT);
-    NioWorkerPool workerPool =
-        new NioWorkerPool(Executors.newCachedThreadPool(workerFactory), workerNum, ThreadNameDeterminer.CURRENT);
-
-    return new NioServerSocketChannelFactory(bossPool, workerPool);
+    
+    EventLoopGroup bossGroup =
+        new NioEventLoopGroup(1, bossFactory);
+    EventLoopGroup workerGroup = 
+        new NioEventLoopGroup(workerNum, workerFactory);
+    
+    return new ServerBootstrap().group(bossGroup, workerGroup);
   }
 
-  public static synchronized void shutdown(){
+  public static void shutdownGracefully(){
     if(LOG.isDebugEnabled()) {
       LOG.debug("Shutdown Shared RPC Pool");
     }
-    if (factory != null) {
-      factory.releaseExternalResources();
+
+    synchronized(lockObjectForLoopGroup) {
+      for (Queue<EventLoopGroup> eventLoopGroupQueue: eventLoopGroupPool.values()) {
+        for (EventLoopGroup eventLoopGroup: eventLoopGroupQueue) {
+          eventLoopGroup.shutdownGracefully();
+        }
+
+        eventLoopGroupQueue.clear();
+      }
+      eventLoopGroupPool.clear();
+    }
+  }
+  
+  static class CleanUpHandler extends Thread {
+
+    @Override
+    public void run() {
+      RpcChannelFactory.shutdownGracefully();
     }
-    factory = null;
+    
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
index c8e622b..4ad9771 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
@@ -21,79 +21,71 @@ package org.apache.tajo.rpc;
 import com.google.common.base.Objects;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.ConnectTimeoutException;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.logging.CommonsLoggerFactory;
-import org.jboss.netty.logging.InternalLoggerFactory;
+import io.netty.channel.ConnectTimeoutException;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.internal.logging.CommonsLoggerFactory;
+import io.netty.util.internal.logging.InternalLoggerFactory;
 
 import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.Map;
 
 public class RpcConnectionPool {
   private static final Log LOG = LogFactory.getLog(RpcConnectionPool.class);
 
-  private ConcurrentMap<RpcConnectionKey, NettyClientBase> connections =
-      new ConcurrentHashMap<RpcConnectionKey, NettyClientBase>();
-  private ChannelGroup accepted = new DefaultChannelGroup();
+  private Map<RpcConnectionKey, NettyClientBase> connections =
+      new HashMap<RpcConnectionKey, NettyClientBase>();
+  private ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
 
   private static RpcConnectionPool instance;
-  private final ClientSocketChannelFactory channelFactory;
+  private final Object lockObject = new Object();
 
   public final static int RPC_RETRIES = 3;
 
-  private RpcConnectionPool(ClientSocketChannelFactory channelFactory) {
-    this.channelFactory =  channelFactory;
+  private RpcConnectionPool() {
   }
 
   public synchronized static RpcConnectionPool getPool() {
     if(instance == null) {
       InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory());
-      instance = new RpcConnectionPool(RpcChannelFactory.getSharedClientChannelFactory());
+      instance = new RpcConnectionPool();
     }
     return instance;
   }
 
-  public synchronized static RpcConnectionPool newPool(String poolName, int workerNum) {
-    return new RpcConnectionPool(RpcChannelFactory.createClientChannelFactory(poolName, workerNum));
-  }
-
   private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey)
       throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
     NettyClientBase client;
     if(rpcConnectionKey.asyncMode) {
-      client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory, RPC_RETRIES);
+      client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, 
+          RPC_RETRIES);
     } else {
-      client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory, RPC_RETRIES);
+      client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, 
+          RPC_RETRIES);
     }
     accepted.add(client.getChannel());
     return client;
   }
 
   public NettyClientBase getConnection(InetSocketAddress addr,
-                                       Class protocolClass, boolean asyncMode)
+                                       Class<?> protocolClass, boolean asyncMode)
       throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
     RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode);
     NettyClientBase client = connections.get(key);
 
     if (client == null) {
-      boolean added;
-      synchronized (connections){
-        client = makeConnection(key);
-        connections.put(key, client);
-        added = true;
-      }
-
-      if (!added) {
-        client.close();
+      synchronized (lockObject){
         client = connections.get(key);
+        if (client == null) {
+          client = makeConnection(key);
+          connections.put(key, client);
+        }
       }
     }
 
-    if (!client.getChannel().isOpen() || !client.getChannel().isConnected()) {
+    if (client.getChannel() == null || !client.getChannel().isOpen() || !client.getChannel().isActive()) {
       LOG.warn("Try to reconnect : " + addr);
       client.connect(addr);
     }
@@ -104,9 +96,11 @@ public class RpcConnectionPool {
     if (client == null) return;
 
     try {
-      if (!client.getChannel().isOpen()) {
-        connections.remove(client.getKey());
-        client.close();
+      synchronized (lockObject) {
+        if (!client.getChannel().isOpen()) {
+          connections.remove(client.getKey());
+          client.close();
+        }
       }
 
       if(LOG.isDebugEnabled()) {
@@ -128,8 +122,10 @@ public class RpcConnectionPool {
         LOG.debug("Close connection [" + client.getKey() + "]");
       }
 
-      connections.remove(client.getKey());
-      client.close();
+      synchronized (lockObject) {
+        connections.remove(client.getKey());
+        client.close();
+      }
 
     } catch (Exception e) {
       LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e);
@@ -140,7 +136,7 @@ public class RpcConnectionPool {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Pool Closed");
     }
-    synchronized(connections) {
+    synchronized(lockObject) {
       for(NettyClientBase eachClient: connections.values()) {
         try {
           eachClient.close();
@@ -148,11 +144,12 @@ public class RpcConnectionPool {
           LOG.error("close client pool error", e);
         }
       }
+
+      connections.clear();
     }
 
-    connections.clear();
     try {
-      accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+      accepted.close();
     } catch (Throwable t) {
       LOG.error(t);
     }
@@ -160,18 +157,16 @@ public class RpcConnectionPool {
 
   public synchronized void shutdown(){
     close();
-    if(channelFactory != null){
-      channelFactory.releaseExternalResources();
-    }
+    RpcChannelFactory.shutdownGracefully();
   }
 
   static class RpcConnectionKey {
     final InetSocketAddress addr;
-    final Class protocolClass;
+    final Class<?> protocolClass;
     final boolean asyncMode;
 
     public RpcConnectionKey(InetSocketAddress addr,
-                            Class protocolClass, boolean asyncMode) {
+                            Class<?> protocolClass, boolean asyncMode) {
       this.addr = addr;
       this.protocolClass = protocolClass;
       this.asyncMode = asyncMode;

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
index 140f781..fb1cec2 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
@@ -18,30 +18,30 @@
 
 package org.apache.tajo.rpc;
 
-import com.google.protobuf.ServiceException;
-
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.google.protobuf.ServiceException;
+
 public abstract class ServerCallable<T> {
   protected InetSocketAddress addr;
   protected long startTime;
   protected long endTime;
-  protected Class protocol;
+  protected Class<?> protocol;
   protected boolean asyncMode;
   protected boolean closeConn;
   protected RpcConnectionPool connPool;
 
   public abstract T call(NettyClientBase client) throws Exception;
 
-  public ServerCallable(RpcConnectionPool connPool,  InetSocketAddress addr, Class protocol, boolean asyncMode) {
+  public ServerCallable(RpcConnectionPool connPool,  InetSocketAddress addr, Class<?> protocol, boolean asyncMode) {
     this(connPool, addr, protocol, asyncMode, false);
   }
 
-  public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class protocol,
+  public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol,
                         boolean asyncMode, boolean closeConn) {
     this.connPool = connPool;
     this.addr = addr;

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
index 61a92bc..31d5265 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -27,13 +27,21 @@ import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
 import org.apache.tajo.rpc.test.TestProtos.SumRequest;
 import org.apache.tajo.rpc.test.TestProtos.SumResponse;
 import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
-import org.jboss.netty.channel.ConnectTimeoutException;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
 
+import io.netty.channel.ConnectTimeoutException;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -47,43 +55,102 @@ public class TestAsyncRpc {
   double sum;
   String echo;
 
-  static AsyncRpcServer server;
-  static AsyncRpcClient client;
-  static Interface stub;
-  static DummyProtocolAsyncImpl service;
-  ClientSocketChannelFactory clientChannelFactory;
+  AsyncRpcServer server;
+  AsyncRpcClient client;
+  Interface stub;
+  DummyProtocolAsyncImpl service;
   int retries;
+  
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @interface SetupRpcConnection {
+    boolean setupRpcServer() default true;
+    boolean setupRpcClient() default true;
+  }
+  
+  @Rule
+  public ExternalResource resource = new ExternalResource() {
+    
+    private Description description;
+
+    @Override
+    public Statement apply(Statement base, Description description) {
+      this.description = description;
+      return super.apply(base, description);
+    }
 
-  @Before
-  public void setUp() throws Exception {
-    retries = 1;
+    @Override
+    protected void before() throws Throwable {
+      SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
+
+      if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
+        setUpRpcServer();
+      }
+      
+      if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
+        setUpRpcClient();
+      }
+    }
+
+    @Override
+    protected void after() {
+      SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
 
-    clientChannelFactory = RpcChannelFactory.createClientChannelFactory("TestAsyncRpc", 2);
+      if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
+        try {
+          tearDownRpcClient();
+        } catch (Exception e) {
+          fail(e.getMessage());
+        }
+      }
+      
+      if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
+        try {
+          tearDownRpcServer();
+        } catch (Exception e) {
+          fail(e.getMessage());
+        }
+      }
+    }
+    
+  };
+  
+  public void setUpRpcServer() throws Exception {
     service = new DummyProtocolAsyncImpl();
     server = new AsyncRpcServer(DummyProtocol.class,
         service, new InetSocketAddress("127.0.0.1", 0), 2);
     server.start();
+  }
+  
+  public void setUpRpcClient() throws Exception {
+    retries = 1;
+
     client = new AsyncRpcClient(DummyProtocol.class,
-        RpcUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries);
+        RpcUtils.getConnectAddress(server.getListenAddress()), retries);
     stub = client.getStub();
   }
 
-  @After
-  public void tearDown() throws Exception {
-    if(client != null) {
-      client.close();
-    }
-
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    RpcChannelFactory.shutdownGracefully();
+  }
+  
+  public void tearDownRpcServer() throws Exception {
     if(server != null) {
       server.shutdown();
+      server = null;
     }
-
-    if (clientChannelFactory != null) {
-      clientChannelFactory.releaseExternalResources();
+  }
+  
+  public void tearDownRpcClient() throws Exception {
+    if(client != null) {
+      client.close();
+      client = null;
     }
   }
 
   boolean calledMarker = false;
+
   @Test
   public void testRpc() throws Exception {
 
@@ -130,7 +197,7 @@ public class TestAsyncRpc {
         testNullLatch.countDown();
       }
     });
-    testNullLatch.await(1000, TimeUnit.MILLISECONDS);
+    assertTrue(testNullLatch.await(1000, TimeUnit.MILLISECONDS));
     assertTrue(service.getNullCalled);
   }
 
@@ -169,8 +236,7 @@ public class TestAsyncRpc {
         .setMessage(MESSAGE).build();
     CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
 
-    server.shutdown();
-    server = null;
+    tearDownRpcServer();
 
     stub.echo(future.getController(), echoMessage, future);
     EchoMessage response = future.get();
@@ -187,8 +253,10 @@ public class TestAsyncRpc {
         .setMessage(MESSAGE).build();
     CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
 
-    server.shutdown();
-    server = null;
+    if (server != null) {
+      server.shutdown(true);
+      server = null;
+    }
 
     stub = client.getStub();
     stub.echo(future.getController(), echoMessage, future);
@@ -200,10 +268,13 @@ public class TestAsyncRpc {
   }
 
   @Test
+  @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false)
   public void testConnectionRetry() throws Exception {
     retries = 10;
-    final InetSocketAddress address = server.getListenAddress();
-    tearDown();
+    ServerSocket serverSocket = new ServerSocket(0);
+    final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort());
+    serverSocket.close();
+    service = new DummyProtocolAsyncImpl();
 
     EchoMessage echoMessage = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
@@ -214,7 +285,7 @@ public class TestAsyncRpc {
       @Override
       public void run() {
         try {
-          Thread.sleep(100);
+          Thread.sleep(1000);
           server = new AsyncRpcServer(DummyProtocol.class,
               service, address, 2);
         } catch (Exception e) {
@@ -225,8 +296,7 @@ public class TestAsyncRpc {
     });
     serverThread.start();
 
-    clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2);
-    client = new AsyncRpcClient(DummyProtocol.class, address, clientChannelFactory, retries);
+    client = new AsyncRpcClient(DummyProtocol.class, address, retries);
     stub = client.getStub();
     stub.echo(future.getController(), echoMessage, future);
 
@@ -240,7 +310,7 @@ public class TestAsyncRpc {
     InetSocketAddress address = new InetSocketAddress("test", 0);
     boolean expected = false;
     try {
-      new AsyncRpcClient(DummyProtocol.class, address, clientChannelFactory, retries);
+      new AsyncRpcClient(DummyProtocol.class, address, retries);
       fail();
     } catch (ConnectTimeoutException e) {
       expected = true;
@@ -251,13 +321,11 @@ public class TestAsyncRpc {
   }
 
   @Test
+  @SetupRpcConnection(setupRpcClient=false)
   public void testUnresolvedAddress() throws Exception {
-    client.close();
-    client = null;
-
     String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
     client = new AsyncRpcClient(DummyProtocol.class,
-        RpcUtils.createUnresolved(hostAndPort), clientChannelFactory, retries);
+        RpcUtils.createUnresolved(hostAndPort), retries);
     Interface stub = client.getStub();
     EchoMessage echoMessage = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 746bfcb..07e2dca 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -24,13 +24,20 @@ import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
 import org.apache.tajo.rpc.test.TestProtos.SumRequest;
 import org.apache.tajo.rpc.test.TestProtos.SumResponse;
 import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.Rule;
 import org.junit.Test;
-
+import org.junit.rules.ExternalResource;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -44,35 +51,92 @@ public class TestBlockingRpc {
   private BlockingInterface stub;
   private DummyProtocolBlockingImpl service;
   private int retries;
-  private ClientSocketChannelFactory clientChannelFactory;
-
-  @Before
-  public void setUp() throws Exception {
-    retries = 1;
+  
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @interface SetupRpcConnection {
+    boolean setupRpcServer() default true;
+    boolean setupRpcClient() default true;
+  }
+  
+  @Rule
+  public ExternalResource resource = new ExternalResource() {
+    
+    private Description description;
+
+    @Override
+    public Statement apply(Statement base, Description description) {
+      this.description = description;
+      return super.apply(base, description);
+    }
 
-    clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2);
+    @Override
+    protected void before() throws Throwable {
+      SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
+      
+      if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
+        setUpRpcServer();
+      }
+      
+      if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
+        setUpRpcClient();
+      }
+    }
 
+    @Override
+    protected void after() {
+      SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
+      
+      if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
+        try {
+          tearDownRpcClient();
+        } catch (Exception e) {
+          fail(e.getMessage());
+        }
+      }
+      
+      if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
+        try {
+          tearDownRpcServer();
+        } catch (Exception e) {
+          fail(e.getMessage());
+        }
+      }
+    }
+    
+  };
+  
+  public void setUpRpcServer() throws Exception {
     service = new DummyProtocolBlockingImpl();
     server = new BlockingRpcServer(DummyProtocol.class, service,
         new InetSocketAddress("127.0.0.1", 0), 2);
     server.start();
+  }
+  
+  public void setUpRpcClient() throws Exception {
+    retries = 1;
+
     client = new BlockingRpcClient(DummyProtocol.class,
-        RpcUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries);
+        RpcUtils.getConnectAddress(server.getListenAddress()), retries);
     stub = client.getStub();
   }
 
-  @After
-  public void tearDown() throws Exception {
-    if(client != null) {
-      client.close();
-    }
-
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    RpcChannelFactory.shutdownGracefully();
+  }
+  
+  public void tearDownRpcServer() throws Exception {
     if(server != null) {
       server.shutdown();
+      server = null;
     }
-
-    if(clientChannelFactory != null){
-      clientChannelFactory.releaseExternalResources();
+  }
+  
+  public void tearDownRpcClient() throws Exception {
+    if(client != null) {
+      client.close();
+      client = null;
     }
   }
 
@@ -93,8 +157,9 @@ public class TestBlockingRpc {
   }
 
   @Test
+  @SetupRpcConnection(setupRpcClient=false)
   public void testRpcWithServiceCallable() throws Exception {
-    RpcConnectionPool pool = RpcConnectionPool.newPool(getClass().getSimpleName(), 2);
+    RpcConnectionPool pool = RpcConnectionPool.getPool();
     final SumRequest request = SumRequest.newBuilder()
         .setX1(1)
         .setX2(2)
@@ -148,10 +213,12 @@ public class TestBlockingRpc {
   }
 
   @Test
+  @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false)
   public void testConnectionRetry() throws Exception {
     retries = 10;
-    final InetSocketAddress address = server.getListenAddress();
-    tearDown();
+    ServerSocket serverSocket = new ServerSocket(0);
+    final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort());
+    serverSocket.close();
 
     EchoMessage message = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
@@ -161,8 +228,8 @@ public class TestBlockingRpc {
       @Override
       public void run() {
         try {
-          Thread.sleep(100);
-          server = new BlockingRpcServer(DummyProtocol.class, service, address, 2);
+          Thread.sleep(1000);
+          server = new BlockingRpcServer(DummyProtocol.class, new DummyProtocolBlockingImpl(), address, 2);
         } catch (Exception e) {
           fail(e.getMessage());
         }
@@ -171,8 +238,7 @@ public class TestBlockingRpc {
     });
     serverThread.start();
 
-    clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2);
-    client = new BlockingRpcClient(DummyProtocol.class, address, clientChannelFactory, retries);
+    client = new BlockingRpcClient(DummyProtocol.class, address, retries);
     stub = client.getStub();
 
     EchoMessage response = stub.echo(null, message);
@@ -182,14 +248,20 @@ public class TestBlockingRpc {
   @Test
   public void testConnectionFailed() throws Exception {
     boolean expected = false;
+    NettyClientBase client = null;
+    
     try {
       int port = server.getListenAddress().getPort() + 1;
-      new BlockingRpcClient(DummyProtocol.class,
-          RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), clientChannelFactory, retries);
+      client = new BlockingRpcClient(DummyProtocol.class,
+          RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), retries);
+      client.close();
       fail("Connection should be failed.");
     } catch (ConnectException ce) {
       expected = true;
     } catch (Throwable ce){
+      if (client != null) {
+        client.close();
+      }
       fail();
     }
     assertTrue(expected);
@@ -240,7 +312,7 @@ public class TestBlockingRpc {
     };
     shutdownThread.start();
 
-    latch.await(5 * 1000, TimeUnit.MILLISECONDS);
+    assertTrue(latch.await(5 * 1000, TimeUnit.MILLISECONDS));
 
     assertTrue(latch.getCount() == 0);
 
@@ -254,13 +326,11 @@ public class TestBlockingRpc {
   }
 
   @Test
+  @SetupRpcConnection(setupRpcClient=false)
   public void testUnresolvedAddress() throws Exception {
-    client.close();
-    client = null;
-
     String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
     client = new BlockingRpcClient(DummyProtocol.class,
-        RpcUtils.createUnresolved(hostAndPort), clientChannelFactory, retries);
+        RpcUtils.createUnresolved(hostAndPort), retries);
     BlockingInterface stub = client.getStub();
 
     EchoMessage message = EchoMessage.newBuilder()

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
index 90499ce..0ca7563 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
@@ -27,7 +27,6 @@ import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
 import org.apache.tajo.rpc.test.TestProtos.SumRequest;
 import org.apache.tajo.rpc.test.TestProtos.SumResponse;
 
-@SuppressWarnings("UnusedDeclaration")
 public class DummyProtocolAsyncImpl implements Interface {
   private static final Log LOG =
       LogFactory.getLog(DummyProtocolAsyncImpl.class);
@@ -74,7 +73,7 @@ public class DummyProtocolAsyncImpl implements Interface {
     try {
       Thread.sleep(3000);
     } catch (InterruptedException e) {
-      e.printStackTrace();
+      LOG.error(e.getMessage());
     }
 
     done.run(request);

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml
index 5513aa6..957b4c1 100644
--- a/tajo-storage/tajo-storage-hdfs/pom.xml
+++ b/tajo-storage/tajo-storage-hdfs/pom.xml
@@ -168,6 +168,18 @@ limitations under the License.
 
   <dependencies>
     <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-transport</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec-http</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.tajo</groupId>
       <artifactId>tajo-common</artifactId>
       <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java
index cf8a54e..389cd31 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java
@@ -21,13 +21,16 @@ package org.apache.tajo;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.net.NetUtils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.concurrent.GlobalEventExecutor;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.Executors;
@@ -38,20 +41,20 @@ public class HttpFileServer {
   private final InetSocketAddress addr;
   private InetSocketAddress bindAddr;
   private ServerBootstrap bootstrap = null;
-  private ChannelFactory factory = null;
+  private EventLoopGroup eventloopGroup = null;
   private ChannelGroup channelGroup = null;
 
   public HttpFileServer(final InetSocketAddress addr) {
     this.addr = addr;
-    this.factory = new NioServerSocketChannelFactory(
-        Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
-        2);
+    this.eventloopGroup = new NioEventLoopGroup(2, Executors.defaultThreadFactory());
 
     // Configure the server.
-    this.bootstrap = new ServerBootstrap(factory);
-    // Set up the event pipeline factory.
-    this.bootstrap.setPipelineFactory(new HttpFileServerPipelineFactory());
-    this.channelGroup = new DefaultChannelGroup();
+    this.bootstrap = new ServerBootstrap();
+    this.bootstrap.childHandler(new HttpFileServerChannelInitializer())
+          .group(eventloopGroup)
+          .option(ChannelOption.TCP_NODELAY, true)
+          .channel(NioServerSocketChannel.class);
+    this.channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
   }
 
   public HttpFileServer(String bindaddr) {
@@ -60,9 +63,9 @@ public class HttpFileServer {
 
   public void start() {
     // Bind and start to accept incoming connections.
-    Channel channel = bootstrap.bind(addr);
-    channelGroup.add(channel);    
-    this.bindAddr = (InetSocketAddress) channel.getLocalAddress();
+    ChannelFuture future = bootstrap.bind(addr).syncUninterruptibly();
+    channelGroup.add(future.channel());    
+    this.bindAddr = (InetSocketAddress) future.channel().localAddress();
     LOG.info("HttpFileServer starts up ("
         + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort()
         + ")");
@@ -73,9 +76,8 @@ public class HttpFileServer {
   }
 
   public void stop() {
-    ChannelGroupFuture future = channelGroup.close();
-    future.awaitUninterruptibly();
-    factory.releaseExternalResources();
+    channelGroup.close();
+    eventloopGroup.shutdownGracefully();
 
     LOG.info("HttpFileServer shutdown ("
         + this.bindAddr.getAddress().getHostAddress() + ":"

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerChannelInitializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerChannelInitializer.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerChannelInitializer.java
new file mode 100644
index 0000000..f2a97b6
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerChannelInitializer.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequestDecoder;
+import io.netty.handler.codec.http.HttpResponseEncoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+public class HttpFileServerChannelInitializer extends ChannelInitializer<Channel> {
+
+  @Override
+  protected void initChannel(Channel channel) throws Exception {
+    ChannelPipeline pipeline = channel.pipeline();
+
+    // Uncomment the following lines if you want HTTPS
+    //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
+    //engine.setUseClientMode(false);
+    //pipeline.addLast("ssl", new SslHandler(engine));
+
+    pipeline.addLast("encoder", new HttpResponseEncoder());
+    pipeline.addLast("decoder", new HttpRequestDecoder());
+    pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
+    pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
+
+    pipeline.addLast("handler", new HttpFileServerHandler());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
index 6c77317..78902f3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
@@ -18,16 +18,13 @@
 
 package org.apache.tajo;
 
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedFile;
-import org.jboss.netty.util.CharsetUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedFile;
+import io.netty.util.CharsetUtil;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -35,39 +32,34 @@ import java.io.RandomAccessFile;
 import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
 
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
-/**
- * this is an implementation copied from HttpStaticFileServerHandler.java of netty 3.6
- */
-public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
+public class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
+  
+  private final Log LOG = LogFactory.getLog(HttpFileServerHandler.class);
 
   @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-    HttpRequest request = (HttpRequest) e.getMessage();
-    if (request.getMethod() != GET) {
-      sendError(ctx, METHOD_NOT_ALLOWED);
+  public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
+
+    if (request.getMethod() != HttpMethod.GET) {
+      sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
       return;
     }
 
     final String path = sanitizeUri(request.getUri());
     if (path == null) {
-      sendError(ctx, FORBIDDEN);
+      sendError(ctx, HttpResponseStatus.FORBIDDEN);
       return;
     }
 
     File file = new File(path);
     if (file.isHidden() || !file.exists()) {
-      sendError(ctx, NOT_FOUND);
+      sendError(ctx, HttpResponseStatus.NOT_FOUND);
       return;
     }
     if (!file.isFile()) {
-      sendError(ctx, FORBIDDEN);
+      sendError(ctx, HttpResponseStatus.FORBIDDEN);
       return;
     }
 
@@ -75,62 +67,62 @@ public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
     try {
       raf = new RandomAccessFile(file, "r");
     } catch (FileNotFoundException fnfe) {
-      sendError(ctx, NOT_FOUND);
+      sendError(ctx, HttpResponseStatus.NOT_FOUND);
       return;
     }
     long fileLength = raf.length();
 
-    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-    setContentLength(response, fileLength);
+    HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+    HttpHeaders.setContentLength(response, fileLength);
     setContentTypeHeader(response);
 
-    Channel ch = e.getChannel();
-
     // Write the initial line and the header.
-    ch.write(response);
+    ctx.write(response);
 
     // Write the content.
     ChannelFuture writeFuture;
-    if (ch.getPipeline().get(SslHandler.class) != null) {
+    ChannelFuture lastContentFuture;
+    if (ctx.pipeline().get(SslHandler.class) != null) {
       // Cannot use zero-copy with HTTPS.
-      writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192));
+      lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)));
     } else {
       // No encryption - use zero-copy.
-      final FileRegion region =
-          new DefaultFileRegion(raf.getChannel(), 0, fileLength);
-      writeFuture = ch.write(region);
-      writeFuture.addListener(new ChannelFutureProgressListener() {
-        public void operationComplete(ChannelFuture future) {
-          region.releaseExternalResources();
+      final FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, fileLength);
+      writeFuture = ctx.write(region);
+      lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+      writeFuture.addListener(new ChannelProgressiveFutureListener() {
+        @Override
+        public void operationProgressed(ChannelProgressiveFuture future, long progress, long total)
+            throws Exception {
+          LOG.trace(String.format("%s: %d / %d", path, progress, total));
         }
 
-        public void operationProgressed(
-            ChannelFuture future, long amount, long current, long total) {
-          System.out.printf("%s: %d / %d (+%d)%n", path, current, total, amount);
+        @Override
+        public void operationComplete(ChannelProgressiveFuture future) throws Exception {
+          region.release();
         }
       });
     }
 
     // Decide whether to close the connection or not.
-    if (!isKeepAlive(request)) {
+    if (!HttpHeaders.isKeepAlive(request)) {
       // Close the connection when the whole content is written out.
-      writeFuture.addListener(ChannelFutureListener.CLOSE);
+      lastContentFuture.addListener(ChannelFutureListener.CLOSE);
     }
   }
 
   @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
       throws Exception {
-    Channel ch = e.getChannel();
-    Throwable cause = e.getCause();
+    Channel ch = ctx.channel();
     if (cause instanceof TooLongFrameException) {
-      sendError(ctx, BAD_REQUEST);
+      sendError(ctx, HttpResponseStatus.BAD_REQUEST);
       return;
     }
 
-    cause.printStackTrace();
-    if (ch.isConnected()) {
-      sendError(ctx, INTERNAL_SERVER_ERROR);
+    LOG.error(cause.getMessage(), cause);
+    if (ch.isActive()) {
+      sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
     }
   }
 
@@ -161,14 +153,13 @@ public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
   }
 
   private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
-    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
-    response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
-    response.setContent(ChannelBuffers.copiedBuffer(
-        "Failure: " + status.toString() + "\r\n",
+    FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
+        Unpooled.copiedBuffer("Failure: " + status.toString() + "\r\n",
         CharsetUtil.UTF_8));
+    response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
 
     // Close the connection as soon as the error message is sent.
-    ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+    ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
   }
 
   /**
@@ -178,7 +169,7 @@ public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
    *            HTTP response
    */
   private static void setContentTypeHeader(HttpResponse response) {
-    response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+    response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
deleted file mode 100644
index cecf93b..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-
-import static org.jboss.netty.channel.Channels.pipeline;
-
-// Uncomment the following lines if you want HTTPS
-//import javax.net.ssl.SSLEngine;
-//import org.jboss.netty.example.securechat.SecureChatSslContextFactory;
-//import org.jboss.netty.handler.ssl.SslHandler;
-
-//this class is copied from HttpStaticFileServerPipelineFactory.java of netty 3.6
-public class HttpFileServerPipelineFactory implements ChannelPipelineFactory {
-  public ChannelPipeline getPipeline() throws Exception {
-    // Create a default pipeline implementation.
-    ChannelPipeline pipeline = pipeline();
-
-    // Uncomment the following lines if you want HTTPS
-    //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
-    //engine.setUseClientMode(false);
-    //pipeline.addLast("ssl", new SslHandler(engine));
-
-    pipeline.addLast("decoder", new HttpRequestDecoder());
-    pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
-    pipeline.addLast("encoder", new HttpResponseEncoder());
-    pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
-
-    pipeline.addLast("handler", new HttpFileServerHandler());
-    return pipeline;
-  }
-}
\ No newline at end of file


Mime
View raw message