tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [10/13] tajo git commit: TAJO-1337: Implements common modules to handle RESTful API
Date Wed, 18 Mar 2015 17:25:49 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
deleted file mode 100644
index ed6b634..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public final class RpcChannelFactory {
-  private static final Log LOG = LogFactory.getLog(RpcChannelFactory.class);
-  
-  private static final int DEFAULT_WORKER_NUM = Runtime.getRuntime().availableProcessors() * 2;
-
-  private static final Object lockObjectForLoopGroup = new Object();
-  private static AtomicInteger serverCount = new AtomicInteger(0);
-
-  public enum ClientChannelId {
-    CLIENT_DEFAULT,
-    FETCHER
-  }
-
-  private static final Map<ClientChannelId, Integer> defaultMaxKeyPoolCount =
-      new ConcurrentHashMap<ClientChannelId, Integer>();
-  private static final Map<ClientChannelId, Queue<EventLoopGroup>> eventLoopGroupPool =
-      new ConcurrentHashMap<ClientChannelId, Queue<EventLoopGroup>>();
-
-  private RpcChannelFactory(){
-  }
-  
-  static {
-    Runtime.getRuntime().addShutdownHook(new CleanUpHandler());
-
-    defaultMaxKeyPoolCount.put(ClientChannelId.CLIENT_DEFAULT, 1);
-    defaultMaxKeyPoolCount.put(ClientChannelId.FETCHER, 1);
-  }
-
-  /**
-  * make this factory static thus all clients can share its thread pool.
-  * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
-  */
-  public static EventLoopGroup getSharedClientEventloopGroup() {
-    return getSharedClientEventloopGroup(DEFAULT_WORKER_NUM);
-  }
-  
-  /**
-  * make this factory static thus all clients can share its thread pool.
-  * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
-  *
-  * @param workerNum The number of workers
-  */
-  public static EventLoopGroup getSharedClientEventloopGroup(int workerNum){
-    //shared woker and boss pool
-    return getSharedClientEventloopGroup(ClientChannelId.CLIENT_DEFAULT, workerNum);
-  }
-
-  /**
-   * This function return eventloopgroup by key. Fetcher client will have one or more eventloopgroup for its throughput.
-   *
-   * @param clientId
-   * @param workerNum
-   * @return
-   */
-  public static EventLoopGroup getSharedClientEventloopGroup(ClientChannelId clientId, int workerNum) {
-    Queue<EventLoopGroup> eventLoopGroupQueue;
-    EventLoopGroup returnEventLoopGroup;
-
-    synchronized (lockObjectForLoopGroup) {
-      eventLoopGroupQueue = eventLoopGroupPool.get(clientId);
-      if (eventLoopGroupQueue == null) {
-        eventLoopGroupQueue = createClientEventloopGroups(clientId, workerNum);
-      }
-
-      returnEventLoopGroup = eventLoopGroupQueue.poll();
-      if (isEventLoopGroupShuttingDown(returnEventLoopGroup)) {
-        returnEventLoopGroup = createClientEventloopGroup(clientId.name(), workerNum);
-      }
-      eventLoopGroupQueue.add(returnEventLoopGroup);
-    }
-
-    return returnEventLoopGroup;
-  }
-
-  protected static boolean isEventLoopGroupShuttingDown(EventLoopGroup eventLoopGroup) {
-    return ((eventLoopGroup == null) || eventLoopGroup.isShuttingDown());
-  }
-
-  // Client must release the external resources
-  protected static Queue<EventLoopGroup> createClientEventloopGroups(ClientChannelId clientId, int workerNum) {
-    int defaultMaxObjectCount = defaultMaxKeyPoolCount.get(clientId);
-    Queue<EventLoopGroup> loopGroupQueue = new ConcurrentLinkedQueue<EventLoopGroup>();
-    eventLoopGroupPool.put(clientId, loopGroupQueue);
-
-    for (int objectIdx = 0; objectIdx < defaultMaxObjectCount; objectIdx++) {
-      loopGroupQueue.add(createClientEventloopGroup(clientId.name(), workerNum));
-    }
-
-    return loopGroupQueue;
-  }
-
-  protected static EventLoopGroup createClientEventloopGroup(String name, int workerNum) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Create " + name + " ClientEventLoopGroup. Worker:" + workerNum);
-    }
-
-    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
-    ThreadFactory clientFactory = builder.setNameFormat(name + " Client #%d").build();
-
-    return new NioEventLoopGroup(workerNum, clientFactory);
-  }
-
-  // Client must release the external resources
-  public static ServerBootstrap createServerChannelFactory(String name, int workerNum) {
-    name = name + "-" + serverCount.incrementAndGet();
-    if(LOG.isInfoEnabled()){
-      LOG.info("Create " + name + " ServerSocketChannelFactory. Worker:" + workerNum);
-    }
-    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
-    ThreadFactory bossFactory = builder.setNameFormat(name + " Server Boss #%d").build();
-    ThreadFactory workerFactory = builder.setNameFormat(name + " Server Worker #%d").build();
-    
-    EventLoopGroup bossGroup =
-        new NioEventLoopGroup(1, bossFactory);
-    EventLoopGroup workerGroup = 
-        new NioEventLoopGroup(workerNum, workerFactory);
-    
-    return new ServerBootstrap().group(bossGroup, workerGroup);
-  }
-
-  public static void shutdownGracefully(){
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Shutdown Shared RPC Pool");
-    }
-
-    synchronized(lockObjectForLoopGroup) {
-      for (Queue<EventLoopGroup> eventLoopGroupQueue: eventLoopGroupPool.values()) {
-        for (EventLoopGroup eventLoopGroup: eventLoopGroupQueue) {
-          eventLoopGroup.shutdownGracefully();
-        }
-
-        eventLoopGroupQueue.clear();
-      }
-      eventLoopGroupPool.clear();
-    }
-  }
-  
-  static class CleanUpHandler extends Thread {
-
-    @Override
-    public void run() {
-      RpcChannelFactory.shutdownGracefully();
-    }
-    
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
deleted file mode 100644
index 6d1f479..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import io.netty.channel.ConnectTimeoutException;
-import io.netty.util.internal.logging.CommonsLoggerFactory;
-import io.netty.util.internal.logging.InternalLoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-
-public class RpcConnectionPool {
-  private static final Log LOG = LogFactory.getLog(RpcConnectionPool.class);
-
-  private Map<RpcConnectionKey, NettyClientBase> connections =
-      new HashMap<RpcConnectionKey, NettyClientBase>();
-
-  private static RpcConnectionPool instance;
-  private final Object lockObject = new Object();
-
-  public final static int RPC_RETRIES = 3;
-
-  private RpcConnectionPool() {
-  }
-
-  public synchronized static RpcConnectionPool getPool() {
-    if(instance == null) {
-      InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory());
-      instance = new RpcConnectionPool();
-    }
-    return instance;
-  }
-
-  private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey)
-      throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
-    NettyClientBase client;
-    if(rpcConnectionKey.asyncMode) {
-      client = new AsyncRpcClient(rpcConnectionKey, RPC_RETRIES);
-    } else {
-      client = new BlockingRpcClient(rpcConnectionKey, RPC_RETRIES);
-    }
-    return client;
-  }
-
-  public static final long DEFAULT_TIMEOUT = 3000;
-  public static final long DEFAULT_INTERVAL = 500;
-
-  public NettyClientBase getConnection(InetSocketAddress addr,
-                                       Class<?> protocolClass, boolean asyncMode)
-      throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
-    return getConnection(addr, protocolClass, asyncMode, DEFAULT_TIMEOUT, DEFAULT_INTERVAL);
-  }
-
-  public NettyClientBase getConnection(InetSocketAddress addr,
-      Class<?> protocolClass, boolean asyncMode, long timeout, long interval)
-      throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
-    RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode);
-
-    RpcUtils.Timer timer = new RpcUtils.Timer(timeout);
-    for (; !timer.isTimedOut(); timer.elapsed()) {
-      NettyClientBase client;
-      synchronized (lockObject) {
-        client = connections.get(key);
-        if (client == null) {
-          connections.put(key, client = makeConnection(key));
-        }
-      }
-      if (client.acquire(timer.remaining())) {
-        return client;
-      }
-      timer.interval(interval);
-    }
-
-    throw new ConnectTimeoutException("Failed to get connection for " + timeout + " msec");
-  }
-
-  public void releaseConnection(NettyClientBase client) {
-    release(client, false);
-  }
-
-  public void closeConnection(NettyClientBase client) {
-    release(client, true);
-  }
-
-  private void release(NettyClientBase client, boolean close) {
-    if (client == null) {
-      return;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Close connection [" + client.getKey() + "]");
-    }
-    try {
-      if (returnToPool(client, close)) {
-        client.close();
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Current Connections [" + connections.size() + "]");
-      }
-    } catch (Exception e) {
-      LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e);
-    }
-  }
-
-  // return true if the connection should be closed
-  private boolean returnToPool(NettyClientBase client, boolean close) {
-    synchronized (lockObject) {
-      if (client.release() && (close || !client.isConnected())) {
-        connections.remove(client.getKey());
-        return true;
-      }
-    }
-    return false;
-  }
-
-  public void close() {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Pool Closed");
-    }
-
-    synchronized (lockObject) {
-      for (NettyClientBase eachClient : connections.values()) {
-        try {
-          eachClient.close();
-        } catch (Exception e) {
-          LOG.error("close client pool error", e);
-        }
-      }
-      connections.clear();
-    }
-  }
-
-  public void shutdown(){
-    close();
-    RpcChannelFactory.shutdownGracefully();
-  }
-
-  static class RpcConnectionKey {
-    final InetSocketAddress addr;
-    final Class<?> protocolClass;
-    final boolean asyncMode;
-
-    final String description;
-
-    public RpcConnectionKey(InetSocketAddress addr,
-                            Class<?> protocolClass, boolean asyncMode) {
-      this.addr = addr;
-      this.protocolClass = protocolClass;
-      this.asyncMode = asyncMode;
-      this.description = "["+ protocolClass + "] " + addr + "," + asyncMode;
-    }
-
-    @Override
-    public String toString() {
-      return description;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if(!(obj instanceof RpcConnectionKey)) {
-        return false;
-      }
-
-      return toString().equals(obj.toString());
-    }
-
-    @Override
-    public int hashCode() {
-      return description.hashCode();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
deleted file mode 100644
index 152d426..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class RpcUtils {
-
-  public static String normalizeInetSocketAddress(InetSocketAddress addr) {
-    return addr.getAddress().getHostAddress() + ":" + addr.getPort();
-  }
-
-  /**
-   * Util method to build socket addr from either:
-   *   <host>
-   *   <host>:<port>
-   *   <fs>://<host>:<port>/<path>
-   */
-  public static InetSocketAddress createSocketAddr(String host, int port) {
-    return new InetSocketAddress(host, port);
-  }
-
-  /**
-   * Returns InetSocketAddress that a client can use to
-   * connect to the server. NettyServerBase.getListenerAddress() is not correct when
-   * the server binds to "0.0.0.0". This returns "hostname:port" of the server,
-   * or "127.0.0.1:port" when the getListenerAddress() returns "0.0.0.0:port".
-   *
-   * @param addr of a listener
-   * @return socket address that a client can use to connect to the server.
-   */
-  public static InetSocketAddress getConnectAddress(InetSocketAddress addr) {
-    if (!addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()) {
-      try {
-        addr = new InetSocketAddress(InetAddress.getLocalHost(), addr.getPort());
-      } catch (UnknownHostException uhe) {
-        // shouldn't get here unless the host doesn't have a loopback iface
-        addr = new InetSocketAddress("127.0.0.1", addr.getPort());
-      }
-    }
-    InetSocketAddress canonicalAddress =
-        new InetSocketAddress(addr.getAddress().getCanonicalHostName(), addr.getPort());
-    return canonicalAddress;
-  }
-
-  public static InetSocketAddress createUnresolved(String addr) {
-    String [] splitted = addr.split(":");
-    return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1]));
-  }
-
-  public static class Timer {
-    private long remaining;
-    private long prev;
-    public Timer(long timeout) {
-      this.remaining = timeout;
-      this.prev = System.currentTimeMillis();
-    }
-
-    public boolean isTimedOut() {
-      return remaining <= 0;
-    }
-
-    public void elapsed() {
-      long current = System.currentTimeMillis();
-      remaining -= (prev - current);
-      prev = current;
-    }
-
-    public void interval(long wait) {
-      if (wait <= 0 || isTimedOut()) {
-        return;
-      }
-      try {
-        Thread.sleep(Math.min(remaining, wait));
-      } catch (Exception ex) {
-        // ignore
-      }
-    }
-
-    public long remaining() {
-      return remaining;
-    }
-  }
-
-  public static class Scrutineer<T> {
-
-    private final AtomicReference<T> reference = new AtomicReference<T>();
-
-    T check(T ticket) {
-      T granted = reference.get();
-      for (;granted == null; granted = reference.get()) {
-        if (reference.compareAndSet(null, ticket)) {
-          return ticket;
-        }
-      }
-      return granted;
-    }
-
-    boolean clear(T granted) {
-      return reference.compareAndSet(granted, null);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
deleted file mode 100644
index fb1cec2..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.protobuf.ServiceException;
-
-public abstract class ServerCallable<T> {
-  protected InetSocketAddress addr;
-  protected long startTime;
-  protected long endTime;
-  protected Class<?> protocol;
-  protected boolean asyncMode;
-  protected boolean closeConn;
-  protected RpcConnectionPool connPool;
-
-  public abstract T call(NettyClientBase client) throws Exception;
-
-  public ServerCallable(RpcConnectionPool connPool,  InetSocketAddress addr, Class<?> protocol, boolean asyncMode) {
-    this(connPool, addr, protocol, asyncMode, false);
-  }
-
-  public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol,
-                        boolean asyncMode, boolean closeConn) {
-    this.connPool = connPool;
-    this.addr = addr;
-    this.protocol = protocol;
-    this.asyncMode = asyncMode;
-    this.closeConn = closeConn;
-  }
-
-  public void beforeCall() {
-    this.startTime = System.currentTimeMillis();
-  }
-
-  public long getStartTime(){
-    return startTime;
-  }
-
-  public void afterCall() {
-    this.endTime = System.currentTimeMillis();
-  }
-
-  public long getEndTime(){
-    return endTime;
-  }
-
-  boolean abort = false;
-  public void abort() {
-    abort = true;
-  }
-  /**
-   * Run this instance with retries, timed waits,
-   * and refinds of missing regions.
-   *
-   * @param <T> the type of the return value
-   * @return an object of type T
-   * @throws com.google.protobuf.ServiceException if a remote or network exception occurs
-   */
-  public T withRetries() throws ServiceException {
-    //TODO configurable
-    final long pause = 500; //ms
-    final int numRetries = 3;
-    List<Throwable> exceptions = new ArrayList<Throwable>();
-
-    for (int tries = 0; tries < numRetries; tries++) {
-      NettyClientBase client = null;
-      try {
-        beforeCall();
-        if(addr != null) {
-          client = connPool.getConnection(addr, protocol, asyncMode);
-        }
-        return call(client);
-      } catch (IOException ioe) {
-        exceptions.add(ioe);
-        if(abort) {
-          throw new ServiceException(ioe.getMessage(), ioe);
-        }
-        if (tries == numRetries - 1) {
-          throw new ServiceException("Giving up after tries=" + tries, ioe);
-        }
-      } catch (Throwable t) {
-        throw new ServiceException(t);
-      } finally {
-        afterCall();
-        if(closeConn) {
-          connPool.closeConnection(client);
-        } else {
-          connPool.releaseConnection(client);
-        }
-      }
-      try {
-        Thread.sleep(pause * (tries + 1));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new ServiceException("Giving up after tries=" + tries, e);
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Run this instance against the server once.
-   * @param <T> the type of the return value
-   * @return an object of type T
-   * @throws java.io.IOException if a remote or network exception occurs
-   * @throws RuntimeException other unspecified error
-   */
-  public T withoutRetries() throws IOException, RuntimeException {
-    NettyClientBase client = null;
-    try {
-      beforeCall();
-      client = connPool.getConnection(addr, protocol, asyncMode);
-      return call(client);
-    } catch (Throwable t) {
-      Throwable t2 = translateException(t);
-      if (t2 instanceof IOException) {
-        throw (IOException)t2;
-      } else {
-        throw new RuntimeException(t2);
-      }
-    } finally {
-      afterCall();
-      if(closeConn) {
-        connPool.closeConnection(client);
-      } else {
-        connPool.releaseConnection(client);
-      }
-    }
-  }
-
-  private static Throwable translateException(Throwable t) throws IOException {
-    if (t instanceof UndeclaredThrowableException) {
-      t = t.getCause();
-    }
-    if (t instanceof RemoteException && t.getCause() != null) {
-      t = t.getCause();
-    }
-    return t;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java
deleted file mode 100644
index 113d181..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.ServiceException;
-import org.apache.commons.lang.exception.ExceptionUtils;
-
-public class TajoServiceException extends ServiceException {
-  private String traceMessage;
-  private String protocol;
-  private String remoteAddress;
-
-  public TajoServiceException(String message) {
-    super(message);
-  }
-  public TajoServiceException(String message, String traceMessage) {
-    super(message);
-    this.traceMessage = traceMessage;
-  }
-
-  public TajoServiceException(String message, Throwable cause, String protocol, String remoteAddress) {
-    super(message, cause);
-
-    this.protocol = protocol;
-    this.remoteAddress = remoteAddress;
-  }
-
-  public String getTraceMessage() {
-    if(traceMessage == null && getCause() != null){
-      this.traceMessage = ExceptionUtils.getStackTrace(getCause());
-    }
-    return traceMessage;
-  }
-
-  public String getProtocol() {
-    return protocol;
-  }
-
-  public String getRemoteAddress() {
-    return remoteAddress;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/proto/DummyProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/proto/DummyProtos.proto b/tajo-rpc/src/main/proto/DummyProtos.proto
deleted file mode 100644
index f53f0d6..0000000
--- a/tajo-rpc/src/main/proto/DummyProtos.proto
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright 2012 Database Lab., Korea Univ.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.rpc.test";
-option java_outer_classname = "DummyProtos";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-message MulRequest1 {
-	required int32 x1 = 1;
-	required int32 x2 = 2;
-}
-
-message MulRequest2 {
-	required int32 x1 = 1;
-	required int32 x2 = 2;
-}
-
-message MulResponse {
-	required int32 result1 = 1;
-	required int32 result2 = 2;
-}
-
-message InnerNode {
-	required string instr = 1;
-}
-
-message InnerRequest {
-	repeated InnerNode nodes = 1;
-}
-
-message InnerResponse {
-	repeated InnerNode nodes = 1;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/proto/RpcProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/proto/RpcProtos.proto b/tajo-rpc/src/main/proto/RpcProtos.proto
deleted file mode 100644
index 69f43ed..0000000
--- a/tajo-rpc/src/main/proto/RpcProtos.proto
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2012 Database Lab., Korea Univ.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.rpc";
-option java_outer_classname = "RpcProtos";
-
-message RpcRequest {
-  required int32 id = 1;
-  required string method_name = 2;
-  optional bytes request_message = 3;
-}
-
-message RpcResponse {
-  required int32 id = 1;
-  optional bytes response_message = 2;
-  optional string error_class = 3;
-  optional string error_message = 4;
-  optional string error_trace = 5;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/proto/TestProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/proto/TestProtocol.proto b/tajo-rpc/src/main/proto/TestProtocol.proto
deleted file mode 100644
index 58640ea..0000000
--- a/tajo-rpc/src/main/proto/TestProtocol.proto
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2012 Database Lab., Korea Univ.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.rpc.test";
-option java_outer_classname = "DummyProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-import "TestProtos.proto";
-
-service DummyProtocolService {
-  rpc sum (SumRequest) returns (SumResponse);
-  rpc echo (EchoMessage) returns (EchoMessage);
-  rpc getError (EchoMessage) returns (EchoMessage);
-  rpc getNull (EchoMessage) returns (EchoMessage);
-  rpc deley (EchoMessage) returns (EchoMessage);
-  rpc throwException (EchoMessage) returns (EchoMessage);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/proto/TestProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/proto/TestProtos.proto b/tajo-rpc/src/main/proto/TestProtos.proto
deleted file mode 100644
index 5001c0e..0000000
--- a/tajo-rpc/src/main/proto/TestProtos.proto
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2012 Database Lab., Korea Univ.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.rpc.test";
-option java_outer_classname = "TestProtos";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-message EchoMessage {
-  required string message = 1;
-}
-
-message SumRequest {
-  required int32 x1 = 1;
-  required int64 x2 = 2;
-  required double x3 = 3;
-  required float x4 = 4;
-}
-
-message SumResponse {
-  required double result = 1;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/test/java/log4j.properties
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/log4j.properties b/tajo-rpc/src/test/java/log4j.properties
deleted file mode 100644
index 2c4d991..0000000
--- a/tajo-rpc/src/test/java/log4j.properties
+++ /dev/null
@@ -1,25 +0,0 @@
-##
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# log4j configuration used during build and unit tests
-
-log4j.rootLogger=info,stdout
-log4j.threshhold=ALL
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p: %c (%M(%L)) - %m%n

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
deleted file mode 100644
index a974a65..0000000
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.RpcCallback;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.test.DummyProtocol;
-import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.Interface;
-import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
-import org.apache.tajo.rpc.test.TestProtos.SumRequest;
-import org.apache.tajo.rpc.test.TestProtos.SumResponse;
-import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
-import org.junit.AfterClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExternalResource;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.*;
-
-public class TestAsyncRpc {
-  private static Log LOG = LogFactory.getLog(TestAsyncRpc.class);
-  private static String MESSAGE = "TestAsyncRpc";
-
-  double sum;
-  String echo;
-
-  AsyncRpcServer server;
-  AsyncRpcClient client;
-  Interface stub;
-  DummyProtocolAsyncImpl service;
-  int retries;
-  
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target(ElementType.METHOD)
-  @interface SetupRpcConnection {
-    boolean setupRpcServer() default true;
-    boolean setupRpcClient() default true;
-  }
-  
-  @Rule
-  public ExternalResource resource = new ExternalResource() {
-    
-    private Description description;
-
-    @Override
-    public Statement apply(Statement base, Description description) {
-      this.description = description;
-      return super.apply(base, description);
-    }
-
-    @Override
-    protected void before() throws Throwable {
-      SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
-
-      if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
-        setUpRpcServer();
-      }
-      
-      if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
-        setUpRpcClient();
-      }
-    }
-
-    @Override
-    protected void after() {
-      SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
-
-      if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
-        try {
-          tearDownRpcClient();
-        } catch (Exception e) {
-          fail(e.getMessage());
-        }
-      }
-      
-      if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
-        try {
-          tearDownRpcServer();
-        } catch (Exception e) {
-          fail(e.getMessage());
-        }
-      }
-    }
-    
-  };
-  
-  public void setUpRpcServer() throws Exception {
-    service = new DummyProtocolAsyncImpl();
-    server = new AsyncRpcServer(DummyProtocol.class,
-        service, new InetSocketAddress("127.0.0.1", 0), 2);
-    server.start();
-  }
-  
-  public void setUpRpcClient() throws Exception {
-    retries = 1;
-
-    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
-          new RpcConnectionPool.RpcConnectionKey(
-              RpcUtils.getConnectAddress(server.getListenAddress()),
-              DummyProtocol.class, true);
-    client = new AsyncRpcClient(rpcConnectionKey, retries);
-    client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT);
-    stub = client.getStub();
-  }
-
-  @AfterClass
-  public static void tearDownClass() throws Exception {
-    RpcChannelFactory.shutdownGracefully();
-  }
-  
-  public void tearDownRpcServer() throws Exception {
-    if(server != null) {
-      server.shutdown();
-      server = null;
-    }
-  }
-  
-  public void tearDownRpcClient() throws Exception {
-    if(client != null) {
-      client.close();
-      client = null;
-    }
-  }
-
-  boolean calledMarker = false;
-
-  @Test
-  public void testRpc() throws Exception {
-
-    SumRequest sumRequest = SumRequest.newBuilder()
-        .setX1(1)
-        .setX2(2)
-        .setX3(3.15d)
-        .setX4(2.0f).build();
-
-    stub.sum(null, sumRequest, new RpcCallback<SumResponse>() {
-      @Override
-      public void run(SumResponse parameter) {
-        sum = parameter.getResult();
-        assertTrue(8.15d == sum);
-      }
-    });
-
-
-    EchoMessage echoMessage = EchoMessage.newBuilder()
-        .setMessage(MESSAGE).build();
-    RpcCallback<EchoMessage> callback = new RpcCallback<EchoMessage>() {
-      @Override
-      public void run(EchoMessage parameter) {
-        echo = parameter.getMessage();
-        assertEquals(MESSAGE, echo);
-        calledMarker = true;
-      }
-    };
-    stub.echo(null, echoMessage, callback);
-    Thread.sleep(1000);
-    assertTrue(calledMarker);
-  }
-
-  private CountDownLatch testNullLatch;
-
-  @Test
-  public void testGetNull() throws Exception {
-    testNullLatch = new CountDownLatch(1);
-    stub.getNull(null, null, new RpcCallback<EchoMessage>() {
-      @Override
-      public void run(EchoMessage parameter) {
-        assertNull(parameter);
-        LOG.info("testGetNull retrieved");
-        testNullLatch.countDown();
-      }
-    });
-    assertTrue(testNullLatch.await(1000, TimeUnit.MILLISECONDS));
-    assertTrue(service.getNullCalled);
-  }
-
-  @Test
-  public void testCallFuture() throws Exception {
-    EchoMessage echoMessage = EchoMessage.newBuilder()
-        .setMessage(MESSAGE).build();
-    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
-    stub.deley(null, echoMessage, future);
-
-    assertFalse(future.isDone());
-    assertEquals(future.get(), echoMessage);
-    assertTrue(future.isDone());
-  }
-
-  @Test
-  public void testCallFutureTimeout() throws Exception {
-    boolean timeout = false;
-    try {
-      EchoMessage echoMessage = EchoMessage.newBuilder()
-          .setMessage(MESSAGE).build();
-      CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
-      stub.deley(null, echoMessage, future);
-
-      assertFalse(future.isDone());
-      future.get(1, TimeUnit.SECONDS);
-    } catch (TimeoutException te) {
-      timeout = true;
-    }
-    assertTrue(timeout);
-  }
-
-  @Test
-  public void testCallFutureDisconnected() throws Exception {
-    EchoMessage echoMessage = EchoMessage.newBuilder()
-        .setMessage(MESSAGE).build();
-    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
-
-    tearDownRpcServer();
-
-    stub.echo(future.getController(), echoMessage, future);
-    EchoMessage response = future.get();
-
-    assertNull(response);
-    assertTrue(future.getController().failed());
-    assertTrue(future.getController().errorText() != null);
-  }
-
-  @Test
-  public void testStubDisconnected() throws Exception {
-
-    EchoMessage echoMessage = EchoMessage.newBuilder()
-        .setMessage(MESSAGE).build();
-    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
-
-    if (server != null) {
-      server.shutdown(true);
-      server = null;
-    }
-
-    stub = client.getStub();
-    stub.echo(future.getController(), echoMessage, future);
-    EchoMessage response = future.get();
-
-    assertNull(response);
-    assertTrue(future.getController().failed());
-    assertTrue(future.getController().errorText() != null);
-  }
-
-  @Test
-  @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false)
-  public void testConnectionRetry() throws Exception {
-    retries = 10;
-    ServerSocket serverSocket = new ServerSocket(0);
-    final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort());
-    serverSocket.close();
-    service = new DummyProtocolAsyncImpl();
-
-    EchoMessage echoMessage = EchoMessage.newBuilder()
-        .setMessage(MESSAGE).build();
-    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
-
-    //lazy startup
-    Thread serverThread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          Thread.sleep(1000);
-          server = new AsyncRpcServer(DummyProtocol.class,
-              service, address, 2);
-        } catch (Exception e) {
-          fail(e.getMessage());
-        }
-        server.start();
-      }
-    });
-    serverThread.start();
-
-    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
-          new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, true);
-    client = new AsyncRpcClient(rpcConnectionKey, retries);
-    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
-    stub = client.getStub();
-    stub.echo(future.getController(), echoMessage, future);
-
-    assertFalse(future.isDone());
-    assertEquals(echoMessage, future.get());
-    assertTrue(future.isDone());
-  }
-
-  @Test
-  public void testConnectionFailure() throws Exception {
-    InetSocketAddress address = new InetSocketAddress("test", 0);
-    try {
-      RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
-          new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, true);
-      NettyClientBase client = new AsyncRpcClient(rpcConnectionKey, retries);
-      assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
-    } catch (Throwable throwable) {
-      fail();
-    }
-  }
-
-  @Test
-  @SetupRpcConnection(setupRpcClient=false)
-  public void testUnresolvedAddress() throws Exception {
-    String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
-    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
-          new RpcConnectionPool.RpcConnectionKey(
-              RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, true);
-    client = new AsyncRpcClient(rpcConnectionKey, retries);
-    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
-    Interface stub = client.getStub();
-    EchoMessage echoMessage = EchoMessage.newBuilder()
-        .setMessage(MESSAGE).build();
-    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
-    stub.deley(null, echoMessage, future);
-
-    assertFalse(future.isDone());
-    assertEquals(future.get(), echoMessage);
-    assertTrue(future.isDone());
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
deleted file mode 100644
index 10dd766..0000000
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import org.apache.tajo.rpc.test.DummyProtocol;
-import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface;
-import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
-import org.apache.tajo.rpc.test.TestProtos.SumRequest;
-import org.apache.tajo.rpc.test.TestProtos.SumResponse;
-import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl;
-import org.junit.AfterClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExternalResource;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.*;
-
-public class TestBlockingRpc {
-  public static final String MESSAGE = "TestBlockingRpc";
-
-  private BlockingRpcServer server;
-  private BlockingRpcClient client;
-  private BlockingInterface stub;
-  private DummyProtocolBlockingImpl service;
-  private int retries;
-  
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target(ElementType.METHOD)
-  @interface SetupRpcConnection {
-    boolean setupRpcServer() default true;
-    boolean setupRpcClient() default true;
-  }
-  
-  @Rule
-  public ExternalResource resource = new ExternalResource() {
-    
-    private Description description;
-
-    @Override
-    public Statement apply(Statement base, Description description) {
-      this.description = description;
-      return super.apply(base, description);
-    }
-
-    @Override
-    protected void before() throws Throwable {
-      SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
-      
-      if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
-        setUpRpcServer();
-      }
-      
-      if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
-        setUpRpcClient();
-      }
-    }
-
-    @Override
-    protected void after() {
-      SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
-      
-      if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
-        try {
-          tearDownRpcClient();
-        } catch (Exception e) {
-          fail(e.getMessage());
-        }
-      }
-      
-      if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
-        try {
-          tearDownRpcServer();
-        } catch (Exception e) {
-          fail(e.getMessage());
-        }
-      }
-    }
-    
-  };
-  
-  public void setUpRpcServer() throws Exception {
-    service = new DummyProtocolBlockingImpl();
-    server = new BlockingRpcServer(DummyProtocol.class, service,
-        new InetSocketAddress("127.0.0.1", 0), 2);
-    server.start();
-  }
-  
-  public void setUpRpcClient() throws Exception {
-    retries = 1;
-
-    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
-          new RpcConnectionPool.RpcConnectionKey(
-              RpcUtils.getConnectAddress(server.getListenAddress()),
-              DummyProtocol.class, false);
-    client = new BlockingRpcClient(rpcConnectionKey, retries);
-    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
-    stub = client.getStub();
-  }
-
-  @AfterClass
-  public static void tearDownClass() throws Exception {
-    RpcChannelFactory.shutdownGracefully();
-  }
-  
-  public void tearDownRpcServer() throws Exception {
-    if(server != null) {
-      server.shutdown();
-      server = null;
-    }
-  }
-  
-  public void tearDownRpcClient() throws Exception {
-    if(client != null) {
-      client.close();
-      client = null;
-    }
-  }
-
-  @Test
-  public void testRpc() throws Exception {
-    SumRequest request = SumRequest.newBuilder()
-        .setX1(1)
-        .setX2(2)
-        .setX3(3.15d)
-        .setX4(2.0f).build();
-    SumResponse response1 = stub.sum(null, request);
-    assertEquals(8.15d, response1.getResult(), 1e-15);
-
-    EchoMessage message = EchoMessage.newBuilder()
-        .setMessage(MESSAGE).build();
-    EchoMessage response2 = stub.echo(null, message);
-    assertEquals(MESSAGE, response2.getMessage());
-  }
-
-  @Test
-  @SetupRpcConnection(setupRpcClient=false)
-  public void testRpcWithServiceCallable() throws Exception {
-    RpcConnectionPool pool = RpcConnectionPool.getPool();
-    final SumRequest request = SumRequest.newBuilder()
-        .setX1(1)
-        .setX2(2)
-        .setX3(3.15d)
-        .setX4(2.0f).build();
-
-    SumResponse response =
-    new ServerCallable<SumResponse>(pool,
-        server.getListenAddress(), DummyProtocol.class, false) {
-      @Override
-      public SumResponse call(NettyClientBase client) throws Exception {
-        BlockingInterface stub2 = client.getStub();
-        SumResponse response1 = stub2.sum(null, request);
-        return response1;
-      }
-    }.withRetries();
-
-    assertEquals(8.15d, response.getResult(), 1e-15);
-
-    response =
-        new ServerCallable<SumResponse>(pool,
-            server.getListenAddress(), DummyProtocol.class, false) {
-          @Override
-          public SumResponse call(NettyClientBase client) throws Exception {
-            BlockingInterface stub2 = client.getStub();
-            SumResponse response1 = stub2.sum(null, request);
-            return response1;
-          }
-        }.withoutRetries();
-
-    assertTrue(8.15d == response.getResult());
-    pool.close();
-  }
-
-  @Test
-  public void testThrowException() throws Exception {
-    EchoMessage message = EchoMessage.newBuilder()
-        .setMessage(MESSAGE).build();
-
-    try {
-      stub.throwException(null, message);
-      fail("RpcCall should throw exception");
-    } catch (Throwable t) {
-      assertTrue(t instanceof TajoServiceException);
-      assertEquals("Exception Test", t.getMessage());
-      TajoServiceException te = (TajoServiceException)t;
-      assertEquals("org.apache.tajo.rpc.test.DummyProtocol", te.getProtocol());
-      assertEquals(server.getListenAddress().getAddress().getHostAddress() + ":" + server.getListenAddress().getPort(),
-          te.getRemoteAddress());
-    }
-  }
-
-  @Test
-  @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false)
-  public void testConnectionRetry() throws Exception {
-    retries = 10;
-    ServerSocket serverSocket = new ServerSocket(0);
-    final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort());
-    serverSocket.close();
-
-    EchoMessage message = EchoMessage.newBuilder()
-        .setMessage(MESSAGE).build();
-
-    //lazy startup
-    Thread serverThread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          Thread.sleep(1000);
-          server = new BlockingRpcServer(DummyProtocol.class, new DummyProtocolBlockingImpl(), address, 2);
-        } catch (Exception e) {
-          fail(e.getMessage());
-        }
-        server.start();
-      }
-    });
-    serverThread.start();
-
-    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
-          new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, false);
-    client = new BlockingRpcClient(rpcConnectionKey, retries);
-    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
-    stub = client.getStub();
-
-    EchoMessage response = stub.echo(null, message);
-    assertEquals(MESSAGE, response.getMessage());
-  }
-
-  @Test
-  public void testConnectionFailed() throws Exception {
-    NettyClientBase client = null;
-    
-    try {
-      int port = server.getListenAddress().getPort() + 1;
-      RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
-          new RpcConnectionPool.RpcConnectionKey(
-              RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)),
-              DummyProtocol.class, false);
-      client = new BlockingRpcClient(rpcConnectionKey, retries);
-      assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
-      client.close();
-    } catch (Throwable ce){
-      if (client != null) {
-        client.close();
-      }
-      fail();
-    }
-  }
-
-  @Test
-  public void testGetNull() throws Exception {
-    assertNull(stub.getNull(null, null));
-    assertTrue(service.getNullCalled);
-  }
-
-  @Test
-  public void testShutdown() throws Exception {
-    final StringBuilder error = new StringBuilder();
-    Thread callThread = new Thread() {
-      public void run() {
-        try {
-          EchoMessage message = EchoMessage.newBuilder()
-              .setMessage(MESSAGE)
-              .build();
-          stub.deley(null, message);
-        } catch (Exception e) {
-          error.append(e.getMessage());
-        }
-        synchronized(error) {
-          error.notifyAll();
-        }
-      }
-    };
-
-    callThread.start();
-
-    final CountDownLatch latch = new CountDownLatch(1);
-    Thread shutdownThread = new Thread() {
-      public void run() {
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-        }
-        try {
-          server.shutdown();
-          server = null;
-          latch.countDown();
-        } catch (Throwable e) {
-          e.printStackTrace();
-        }
-      }
-    };
-    shutdownThread.start();
-
-    assertTrue(latch.await(5 * 1000, TimeUnit.MILLISECONDS));
-
-    assertTrue(latch.getCount() == 0);
-
-    synchronized(error) {
-      error.wait(5 * 1000);
-    }
-
-    if(!error.toString().isEmpty()) {
-      fail(error.toString());
-    }
-  }
-
-  @Test
-  @SetupRpcConnection(setupRpcClient=false)
-  public void testUnresolvedAddress() throws Exception {
-    String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
-    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
-          new RpcConnectionPool.RpcConnectionKey(
-              RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, false);
-    client = new BlockingRpcClient(rpcConnectionKey, retries);
-    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
-    BlockingInterface stub = client.getStub();
-
-    EchoMessage message = EchoMessage.newBuilder()
-        .setMessage(MESSAGE).build();
-    EchoMessage response2 = stub.echo(null, message);
-    assertEquals(MESSAGE, response2.getMessage());
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
deleted file mode 100644
index 0ca7563..0000000
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc.test.impl;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.Interface;
-import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
-import org.apache.tajo.rpc.test.TestProtos.SumRequest;
-import org.apache.tajo.rpc.test.TestProtos.SumResponse;
-
-public class DummyProtocolAsyncImpl implements Interface {
-  private static final Log LOG =
-      LogFactory.getLog(DummyProtocolAsyncImpl.class);
-  public boolean getNullCalled = false;
-  public boolean getErrorCalled = false;
-
-  @Override
-  public void sum(RpcController controller, SumRequest request,
-                  RpcCallback<SumResponse> done) {
-
-    SumResponse response = SumResponse.newBuilder().setResult(
-        request.getX1()+request.getX2()+request.getX3()+request.getX4()
-    ).build();
-    done.run(response);
-  }
-
-  @Override
-  public void echo(RpcController controller, EchoMessage request,
-                   RpcCallback<EchoMessage> done) {
-
-    done.run(request);
-  }
-
-  @Override
-  public void getError(RpcController controller, EchoMessage request,
-                       RpcCallback<EchoMessage> done) {
-    LOG.info("noCallback is called");
-    getErrorCalled = true;
-    controller.setFailed(request.getMessage());
-    done.run(request);
-  }
-
-  @Override
-  public void getNull(RpcController controller, EchoMessage request,
-                      RpcCallback<EchoMessage> done) {
-    getNullCalled = true;
-    LOG.info("noCallback is called");
-    done.run(null);
-  }
-
-  @Override
-  public void deley(RpcController controller, EchoMessage request,
-                    RpcCallback<EchoMessage> done) {
-    try {
-      Thread.sleep(3000);
-    } catch (InterruptedException e) {
-      LOG.error(e.getMessage());
-    }
-
-    done.run(request);
-  }
-
-  public void throwException(RpcController controller, EchoMessage request,
-                             RpcCallback<EchoMessage> done) {
-    done.run(request);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java
deleted file mode 100644
index 8d4b597..0000000
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc.test.impl;
-
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface;
-import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
-import org.apache.tajo.rpc.test.TestProtos.SumRequest;
-import org.apache.tajo.rpc.test.TestProtos.SumResponse;
-
-public class DummyProtocolBlockingImpl implements BlockingInterface {
-  private static final Log LOG =
-      LogFactory.getLog(DummyProtocolBlockingImpl.class);
-  public boolean getNullCalled = false;
-  public boolean getErrorCalled = false;
-
-  @Override
-  public SumResponse sum(RpcController controller, SumRequest request)
-      throws ServiceException {
-    return SumResponse.newBuilder().setResult(
-        request.getX1()+request.getX2()+request.getX3()+request.getX4()
-    ).build();
-  }
-
-  @Override
-  public EchoMessage echo(RpcController controller, EchoMessage request)
-      throws ServiceException {
-    return EchoMessage.newBuilder().
-        setMessage(request.getMessage()).build();
-  }
-
-  @Override
-  public EchoMessage getError(RpcController controller, EchoMessage request)
-      throws ServiceException {
-    getErrorCalled = true;
-    controller.setFailed(request.getMessage());
-    return request;
-  }
-
-  @Override
-  public EchoMessage getNull(RpcController controller, EchoMessage request)
-      throws ServiceException {
-    getNullCalled = true;
-    LOG.info("noCallback is called");
-    return null;
-  }
-
-  @Override
-  public EchoMessage deley(RpcController controller, EchoMessage request)
-      throws ServiceException {
-    try {
-      Thread.sleep(3000);
-    } catch (InterruptedException e) {
-      //throw new ServiceException(e.getMessage(), e);
-    }
-
-    return request;
-  }
-
-  public EchoMessage throwException(RpcController controller, EchoMessage request)
-      throws ServiceException {
-    throw new ServiceException("Exception Test");
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/pom.xml b/tajo-rpc/tajo-rpc-common/pom.xml
new file mode 100644
index 0000000..2b1cd7a
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-common/pom.xml
@@ -0,0 +1,216 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>tajo-project</artifactId>
+    <version>0.11.0-SNAPSHOT</version>
+    <groupId>org.apache.tajo</groupId>
+    <relativePath>../../tajo-project</relativePath>
+  </parent>
+  <packaging>jar</packaging>
+  <artifactId>tajo-rpc-common</artifactId>
+  <name>Tajo Rpc Common</name>
+  <description>Common Implementation for Netty</description>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+          <encoding>${project.build.sourceEncoding}</encoding>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <configuration>
+        </configuration>
+        <executions>
+          <execution>
+            <id>create-jar</id>
+            <phase>prepare-package</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+        <version>2.15</version>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-transport</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-handler</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+  <repositories>
+    <repository>
+      <id>repository.jboss.org</id>
+      <url>https://repository.jboss.org/nexus/content/repositories/releases/
+			</url>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <profiles>
+    <profile>
+      <id>docs</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-javadoc-plugin</artifactId>
+            <executions>
+              <execution>
+                <!-- build javadoc jars per jar for publishing to maven -->
+                <id>module-javadocs</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>jar</goal>
+                </goals>
+                <configuration>
+                  <destDir>${project.build.directory}</destDir>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>dist</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-antrun-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>dist</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>run</goal>
+                </goals>
+                <configuration>
+                  <target>
+                    <echo file="${project.build.directory}/dist-layout-stitching.sh">
+                      run() {
+                      echo "\$ ${@}"
+                      "${@}"
+                      res=$?
+                      if [ $res != 0 ]; then
+                      echo
+                      echo "Failed!"
+                      echo
+                      exit $res
+                      fi
+                      }
+
+                      ROOT=`cd ${basedir}/..;pwd`
+                      echo
+                      echo "Current directory `pwd`"
+                      echo
+                      run rm -rf ${project.artifactId}-${project.version}
+                      run mkdir ${project.artifactId}-${project.version}
+                      run cd ${project.artifactId}-${project.version}
+                      run cp -r ${basedir}/target/${project.artifactId}-${project.version}*.jar .
+                      echo
+                      echo "Tajo Rpc dist layout available at: ${project.build.directory}/${project.artifactId}-${project.version}"
+                      echo
+                    </echo>
+                    <exec executable="sh" dir="${project.build.directory}" failonerror="true">
+                      <arg line="./dist-layout-stitching.sh" />
+                    </exec>
+                  </target>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+        <version>2.15</version>
+      </plugin>
+    </plugins>
+  </reporting>
+
+</project>

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
new file mode 100644
index 0000000..ad443d7
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Base class for netty implementation.
+ */
+public class NettyServerBase {
+  private static final Log LOG = LogFactory.getLog(NettyServerBase.class);
+  private static final String DEFAULT_PREFIX = "RpcServer_";
+  private static final AtomicInteger sequenceId = new AtomicInteger(0);
+
+  protected String serviceName;
+  protected InetSocketAddress serverAddr;
+  protected InetSocketAddress bindAddress;
+  protected ChannelInitializer<Channel> initializer;
+  protected ServerBootstrap bootstrap;
+  protected ChannelFuture channelFuture;
+  protected ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+  private InetSocketAddress initIsa;
+  private Set<RpcEventListener> listeners = Collections.synchronizedSet(new HashSet<RpcEventListener>());
+
+  public NettyServerBase(InetSocketAddress address) {
+    this.initIsa = address;
+  }
+
+  public NettyServerBase(String serviceName, InetSocketAddress addr) {
+    this.serviceName = serviceName;
+    this.initIsa = addr;
+  }
+
+  public void setName(String name) {
+    this.serviceName = name;
+  }
+
+  public void init(ChannelInitializer<Channel> initializer, int workerNum) {
+    for (RpcEventListener listener: listeners) {
+      listener.onBeforeInit(this);
+    }
+    
+    bootstrap = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum);
+
+    this.initializer = initializer;
+    bootstrap
+      .channel(NioServerSocketChannel.class)
+      .childHandler(initializer)
+      .option(ChannelOption.SO_REUSEADDR, true)
+      .option(ChannelOption.TCP_NODELAY, true)
+      .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+      .childOption(ChannelOption.TCP_NODELAY, true)
+      .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
+      .childOption(ChannelOption.SO_RCVBUF, 1048576 * 10);
+    
+    for (RpcEventListener listener: listeners) {
+      listener.onAfterInit(this);
+    }
+  }
+
+  public InetSocketAddress getListenAddress() {
+    return this.bindAddress;
+  }
+
+  public void start() {
+    for (RpcEventListener listener: listeners) {
+      listener.onBeforeStart(this);
+    }
+    
+    if (serviceName == null) {
+      this.serviceName = getNextDefaultServiceName();
+    }
+
+    if (initIsa.getPort() == 0) {
+      try {
+        int port = getUnusedPort();
+        serverAddr = new InetSocketAddress(initIsa.getHostName(), port);
+      } catch (IOException e) {
+        LOG.error(e, e);
+      }
+    } else {
+      serverAddr = initIsa;
+    }
+
+    this.channelFuture = bootstrap.clone().bind(serverAddr).syncUninterruptibly();
+    this.bindAddress = (InetSocketAddress) channelFuture.channel().localAddress();
+
+    for (RpcEventListener listener: listeners) {
+      listener.onAfterStart(this);
+    }
+    LOG.info("Rpc (" + serviceName + ") listens on " + this.bindAddress);
+  }
+
+  public Channel getChannel() {
+    return this.channelFuture.channel();
+  }
+
+  public void shutdown() {
+    shutdown(false);
+  }
+
+  public void shutdown(boolean waitUntilThreadsStop) {
+    for (RpcEventListener listener: listeners) {
+      listener.onBeforeShutdown(this);
+    }
+    
+    try {
+      accepted.close();
+    } catch (Throwable t) {
+      LOG.error(t.getMessage(), t);
+    }
+
+    if(bootstrap != null) {
+      if (bootstrap.childGroup() != null) {
+        bootstrap.childGroup().shutdownGracefully();
+        if (waitUntilThreadsStop) {
+          bootstrap.childGroup().terminationFuture().awaitUninterruptibly();
+        }
+      }
+
+      if (bootstrap.group() != null) {
+        bootstrap.group().shutdownGracefully();
+        if (waitUntilThreadsStop) {
+          bootstrap.childGroup().terminationFuture().awaitUninterruptibly();
+        }
+      }
+    }
+    
+    for (RpcEventListener listener: listeners) {
+      listener.onAfterShutdown(this);
+    }
+
+    if (bindAddress != null) {
+      LOG.info("Rpc (" + serviceName + ") listened on "
+          + RpcUtils.normalizeInetSocketAddress(bindAddress)+ ") shutdown");
+    }
+  }
+
+  private static String getNextDefaultServiceName() {
+    return DEFAULT_PREFIX + sequenceId.getAndIncrement();
+  }
+
+  private static final int startPortRange = 10000;
+  private static final int endPortRange = 50000;
+  private static final Random rnd = new Random(System.currentTimeMillis());
+  // each system has a different starting port number within the given range.
+  private static final AtomicInteger nextPortNum =
+      new AtomicInteger(startPortRange+ rnd.nextInt(endPortRange - startPortRange));
+  private static final Object lockObject = new Object();
+
+
+  private synchronized static int getUnusedPort() throws IOException {
+    while (true) {
+      int port = nextPortNum.getAndIncrement();
+      if (port >= endPortRange) {
+        synchronized (lockObject) {
+          nextPortNum.set(startPortRange);
+          port = nextPortNum.getAndIncrement();
+        }
+      }
+      if (available(port)) {
+        return port;
+      }
+    }
+  }
+
+  private static boolean available(int port) throws IOException {
+    if (port < 1024 || port > 65535) {
+      throw new IllegalArgumentException("Port Number Out of Bound: " + port);
+    }
+
+    ServerSocket ss = null;
+    DatagramSocket ds = null;
+
+    try {
+      ss = new ServerSocket(port);
+      ss.setReuseAddress(true);
+
+      ds = new DatagramSocket(port);
+      ds.setReuseAddress(true);
+
+      return true;
+
+    } catch (IOException e) {
+      return false;
+    } finally {
+      if (ss != null) {
+        ss.close();
+      }
+
+      if (ds != null) {
+        ds.close();
+      }
+    }
+  }
+  
+  public void addListener(RpcEventListener listener) {
+    listeners.add(listener);
+  }
+  
+  public void removeListener(RpcEventListener listener) {
+    listeners.remove(listener);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RemoteException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RemoteException.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RemoteException.java
new file mode 100644
index 0000000..30c110d
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RemoteException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+public class RemoteException extends RuntimeException {
+  public RemoteException() {
+    super();
+  }
+
+  public RemoteException(String message) {
+    super(message);
+  }
+
+  public RemoteException(Throwable t) {
+    super(t);
+  }
+
+  public RemoteException(String message, Throwable t) {
+    super(message, t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
new file mode 100644
index 0000000..3c054ad
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+
+public class RetriesExhaustedException extends RuntimeException {
+  private static final long serialVersionUID = 1876775844L;
+
+  public RetriesExhaustedException(final String msg) {
+    super(msg);
+  }
+
+  public RetriesExhaustedException(final String msg, final IOException e) {
+    super(msg, e);
+  }
+
+  /**
+   * Datastructure that allows adding more info around Throwable incident.
+   */
+  public static class ThrowableWithExtraContext {
+    private final Throwable t;
+    private final long when;
+    private final String extras;
+
+    public ThrowableWithExtraContext(final Throwable t, final long when,
+        final String extras) {
+      this.t = t;
+      this.when = when;
+      this.extras = extras;
+    }
+ 
+    @Override
+    public String toString() {
+      return new Date(this.when).toString() + ", " + extras + ", " + t.toString();
+    }
+  }
+
+  /**
+   * Create a new RetriesExhaustedException from the list of prior failures.
+   * @param callableVitals Details from the {@link ServerCallable} we were using
+   * when we got this exception.
+   * @param numTries The number of tries we made
+   * @param exceptions List of exceptions that failed before giving up
+   */
+  public RetriesExhaustedException(final String callableVitals, int numTries,
+      List<Throwable> exceptions) {
+    super(getMessage(callableVitals, numTries, exceptions));
+  }
+
+  /**
+   * Create a new RetriesExhaustedException from the list of prior failures.
+   * @param numTries
+   * @param exceptions List of exceptions that failed before giving up
+   */
+  public RetriesExhaustedException(final int numTries,
+      final List<Throwable> exceptions) {
+    super(getMessage(numTries, exceptions));
+  }
+
+  private static String getMessage(String callableVitals, int numTries,
+      List<Throwable> exceptions) {
+    StringBuilder buffer = new StringBuilder("Failed contacting ");
+    buffer.append(callableVitals);
+    buffer.append(" after ");
+    buffer.append(numTries + 1);
+    buffer.append(" attempts.\nExceptions:\n");
+    for (Throwable t : exceptions) {
+      buffer.append(t.toString());
+      buffer.append("\n");
+    }
+    return buffer.toString();
+  }
+
+  private static String getMessage(final int numTries,
+      final List<Throwable> exceptions) {
+    StringBuilder buffer = new StringBuilder("Failed after attempts=");
+    buffer.append(numTries + 1);
+    buffer.append(", exceptions:\n");
+    for (Throwable t : exceptions) {
+      buffer.append(t.toString());
+      buffer.append("\n");
+    }
+    return buffer.toString();
+  }
+}
\ No newline at end of file


Mime
View raw message