tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ji...@apache.org
Subject [5/5] tajo git commit: TAJO-1337: Implements common modules to handle RESTful API
Date Wed, 18 Mar 2015 14:57:59 GMT
TAJO-1337: Implements common modules to handle RESTful API

Closes #399


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/a9ae3cab
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/a9ae3cab
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/a9ae3cab

Branch: refs/heads/master
Commit: a9ae3cab69526294475a771014e9c0e49c80462b
Parents: 82d44af
Author: Jihun Kang <jihun@apache.org>
Authored: Wed Mar 18 23:57:06 2015 +0900
Committer: Jihun Kang <jihun@apache.org>
Committed: Wed Mar 18 23:57:06 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 tajo-catalog/tajo-catalog-client/pom.xml        |   2 +-
 .../tajo-catalog-drivers/tajo-hcatalog/pom.xml  |   2 +-
 tajo-catalog/tajo-catalog-server/pom.xml        |   2 +-
 tajo-cli/pom.xml                                |   2 +-
 tajo-client/pom.xml                             |   2 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |   2 +
 tajo-core/pom.xml                               |   2 +-
 tajo-dist/pom.xml                               |   7 +-
 tajo-project/pom.xml                            |  31 ++
 tajo-pullserver/pom.xml                         |   2 +-
 tajo-rpc/pom.xml                                | 180 ++--------
 .../org/apache/tajo/rpc/AsyncRpcClient.java     | 227 ------------
 .../org/apache/tajo/rpc/AsyncRpcServer.java     | 148 --------
 .../org/apache/tajo/rpc/BlockingRpcClient.java  | 273 ---------------
 .../org/apache/tajo/rpc/BlockingRpcServer.java  | 147 --------
 .../java/org/apache/tajo/rpc/CallFuture.java    |  84 -----
 .../apache/tajo/rpc/DefaultRpcController.java   |  65 ----
 .../org/apache/tajo/rpc/NettyClientBase.java    | 221 ------------
 .../org/apache/tajo/rpc/NettyRpcController.java |  63 ----
 .../org/apache/tajo/rpc/NettyServerBase.java    | 205 -----------
 .../java/org/apache/tajo/rpc/NullCallback.java  |  38 --
 .../tajo/rpc/ProtoChannelInitializer.java       |  50 ---
 .../apache/tajo/rpc/RemoteCallException.java    |  69 ----
 .../org/apache/tajo/rpc/RemoteException.java    |  37 --
 .../tajo/rpc/RetriesExhaustedException.java     | 104 ------
 .../org/apache/tajo/rpc/RpcChannelFactory.java  | 182 ----------
 .../org/apache/tajo/rpc/RpcConnectionPool.java  | 190 ----------
 .../main/java/org/apache/tajo/rpc/RpcUtils.java | 122 -------
 .../org/apache/tajo/rpc/ServerCallable.java     | 162 ---------
 .../apache/tajo/rpc/TajoServiceException.java   |  58 ---
 tajo-rpc/src/main/proto/DummyProtos.proto       |  47 ---
 tajo-rpc/src/main/proto/RpcProtos.proto         |  32 --
 tajo-rpc/src/main/proto/TestProtocol.proto      |  31 --
 tajo-rpc/src/main/proto/TestProtos.proto        |  35 --
 tajo-rpc/src/test/java/log4j.properties         |  25 --
 .../java/org/apache/tajo/rpc/TestAsyncRpc.java  | 345 ------------------
 .../org/apache/tajo/rpc/TestBlockingRpc.java    | 349 -------------------
 .../rpc/test/impl/DummyProtocolAsyncImpl.java   |  86 -----
 .../test/impl/DummyProtocolBlockingImpl.java    |  83 -----
 tajo-rpc/tajo-rpc-common/pom.xml                | 216 ++++++++++++
 .../org/apache/tajo/rpc/NettyServerBase.java    | 243 +++++++++++++
 .../org/apache/tajo/rpc/RemoteException.java    |  37 ++
 .../tajo/rpc/RetriesExhaustedException.java     | 104 ++++++
 .../org/apache/tajo/rpc/RpcChannelFactory.java  | 182 ++++++++++
 .../org/apache/tajo/rpc/RpcEventListener.java   |  62 ++++
 .../main/java/org/apache/tajo/rpc/RpcUtils.java | 122 +++++++
 tajo-rpc/tajo-rpc-protobuf/pom.xml              | 274 +++++++++++++++
 .../org/apache/tajo/rpc/AsyncRpcClient.java     | 227 ++++++++++++
 .../org/apache/tajo/rpc/AsyncRpcServer.java     | 148 ++++++++
 .../org/apache/tajo/rpc/BlockingRpcClient.java  | 273 +++++++++++++++
 .../org/apache/tajo/rpc/BlockingRpcServer.java  | 147 ++++++++
 .../java/org/apache/tajo/rpc/CallFuture.java    |  84 +++++
 .../apache/tajo/rpc/DefaultRpcController.java   |  65 ++++
 .../org/apache/tajo/rpc/NettyClientBase.java    | 221 ++++++++++++
 .../org/apache/tajo/rpc/NettyRpcController.java |  63 ++++
 .../java/org/apache/tajo/rpc/NullCallback.java  |  38 ++
 .../tajo/rpc/ProtoChannelInitializer.java       |  50 +++
 .../apache/tajo/rpc/RemoteCallException.java    |  69 ++++
 .../org/apache/tajo/rpc/RemoteException.java    |  37 ++
 .../tajo/rpc/RetriesExhaustedException.java     | 104 ++++++
 .../org/apache/tajo/rpc/RpcConnectionPool.java  | 190 ++++++++++
 .../org/apache/tajo/rpc/ServerCallable.java     | 162 +++++++++
 .../apache/tajo/rpc/TajoServiceException.java   |  58 +++
 .../src/main/proto/DummyProtos.proto            |  47 +++
 .../src/main/proto/RpcProtos.proto              |  32 ++
 .../src/main/proto/TestProtocol.proto           |  31 ++
 .../src/main/proto/TestProtos.proto             |  35 ++
 .../src/test/java/log4j.properties              |  25 ++
 .../java/org/apache/tajo/rpc/TestAsyncRpc.java  | 345 ++++++++++++++++++
 .../org/apache/tajo/rpc/TestBlockingRpc.java    | 349 +++++++++++++++++++
 .../rpc/test/impl/DummyProtocolAsyncImpl.java   |  86 +++++
 .../test/impl/DummyProtocolBlockingImpl.java    |  83 +++++
 tajo-rpc/tajo-ws-rs/pom.xml                     | 218 ++++++++++++
 .../rs/netty/NettyRestChannelInitializer.java   |  50 +++
 .../ws/rs/netty/NettyRestHandlerContainer.java  | 319 +++++++++++++++++
 .../NettyRestHandlerContainerProvider.java      |  42 +++
 .../tajo/ws/rs/netty/NettyRestServer.java       |  67 ++++
 .../ws/rs/netty/NettyRestServerFactory.java     |  89 +++++
 .../ws/rs/netty/NettyRestServerListener.java    |  72 ++++
 .../tajo/ws/rs/netty/gson/GsonFeature.java      |  34 ++
 .../tajo/ws/rs/netty/gson/GsonReader.java       |  52 +++
 .../apache/tajo/ws/rs/netty/gson/GsonUtil.java  |  32 ++
 .../tajo/ws/rs/netty/gson/GsonWriter.java       |  59 ++++
 .../NettyRestHandlerContainerProviderTest.java  |  66 ++++
 .../tajo/ws/rs/netty/NettyRestServerTest.java   | 137 ++++++++
 .../ws/rs/netty/testapp1/TestApplication1.java  |  38 ++
 .../ws/rs/netty/testapp1/TestResource1.java     |  36 ++
 .../ws/rs/netty/testapp2/DirectoriesDao.java    |  39 +++
 .../rs/netty/testapp2/DirectoriesResource.java  |  85 +++++
 .../tajo/ws/rs/netty/testapp2/Directory.java    |  52 +++
 .../testapp2/FileManagementApplication.java     |  35 ++
 92 files changed, 5809 insertions(+), 3636 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index c3f2691..4875cab 100644
--- a/CHANGES
+++ b/CHANGES
@@ -76,6 +76,8 @@ Release 0.11.0 - unreleased
 
   SUB TASKS
 
+    TAJO-1337: Implements common modules to handle RESTful API. (jihun)
+
     TAJO-1329: Improve Schema class to support nested struct support.
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-catalog/tajo-catalog-client/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/pom.xml b/tajo-catalog/tajo-catalog-client/pom.xml
index 98b85a8..84e2aa3 100644
--- a/tajo-catalog/tajo-catalog-client/pom.xml
+++ b/tajo-catalog/tajo-catalog-client/pom.xml
@@ -135,7 +135,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-rpc</artifactId>
+      <artifactId>tajo-rpc-protobuf</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml
index fe8f34a..7c3efdd 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml
@@ -109,7 +109,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-rpc</artifactId>
+      <artifactId>tajo-rpc-protobuf</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-catalog/tajo-catalog-server/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/pom.xml b/tajo-catalog/tajo-catalog-server/pom.xml
index 501f9af..8efeecf 100644
--- a/tajo-catalog/tajo-catalog-server/pom.xml
+++ b/tajo-catalog/tajo-catalog-server/pom.xml
@@ -141,7 +141,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-rpc</artifactId>
+      <artifactId>tajo-rpc-protobuf</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-cli/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-cli/pom.xml b/tajo-cli/pom.xml
index 684c298..e8360ad 100644
--- a/tajo-cli/pom.xml
+++ b/tajo-cli/pom.xml
@@ -140,7 +140,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-rpc</artifactId>
+      <artifactId>tajo-rpc-protobuf</artifactId>
     </dependency>
     <dependency>
       <groupId>commons-cli</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-client/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-client/pom.xml b/tajo-client/pom.xml
index 692e1b5..e6be476 100644
--- a/tajo-client/pom.xml
+++ b/tajo-client/pom.xml
@@ -195,7 +195,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-rpc</artifactId>
+      <artifactId>tajo-rpc-protobuf</artifactId>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 4ed8097..5b569d5 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -249,6 +249,8 @@ public class TajoConf extends Configuration {
         Runtime.getRuntime().availableProcessors() * 1),
     WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM("tajo.worker.service.rpc.server.worker-thread-num",
         Runtime.getRuntime().availableProcessors() * 1),
+    REST_SERVICE_RPC_SERVER_WORKER_THREAD_NUM("tajo.rest.service.rpc.server.worker-thread-num",
+        Runtime.getRuntime().availableProcessors() * 1),
 
     // Task Configuration -----------------------------------------------------
     TASK_DEFAULT_MEMORY("tajo.task.memory-slot-mb.default", 512),

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index 38bddec..61a156b 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -277,7 +277,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-rpc</artifactId>
+      <artifactId>tajo-rpc-protobuf</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml
index aed7b4b..da5f48f 100644
--- a/tajo-dist/pom.xml
+++ b/tajo-dist/pom.xml
@@ -60,7 +60,12 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-rpc</artifactId>
+      <artifactId>tajo-rpc-protobuf</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-ws-rs</artifactId>
       <scope>provided</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index 9f1b1ab..37121e3 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -38,6 +38,7 @@
     <tajo.version>0.11.0-SNAPSHOT</tajo.version>
     <hbase.version>0.98.7-hadoop2</hbase.version>
     <netty.version>4.0.25.Final</netty.version>
+    <jersey.version>2.6</jersey.version>
     <tajo.root>${project.parent.relativePath}/..</tajo.root>
     <extra.source.path>src/main/hadoop-${hadoop.version}</extra.source.path>
   </properties>
@@ -789,6 +790,21 @@
       </dependency>
       <dependency>
         <groupId>org.apache.tajo</groupId>
+        <artifactId>tajo-rpc-common</artifactId>
+        <version>${tajo.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tajo</groupId>
+        <artifactId>tajo-rpc-protobuf</artifactId>
+        <version>${tajo.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tajo</groupId>
+        <artifactId>tajo-ws-rs</artifactId>
+        <version>${tajo.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tajo</groupId>
         <artifactId>tajo-algebra</artifactId>
         <version>${tajo.version}</version>
       </dependency>
@@ -1063,6 +1079,21 @@
         <artifactId>jcip-annotations</artifactId>
         <version>1.0-1</version>
       </dependency>
+      <dependency>
+        <groupId>org.glassfish.jersey.core</groupId>
+        <artifactId>jersey-common</artifactId>
+        <version>${jersey.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.glassfish.jersey.core</groupId>
+        <artifactId>jersey-server</artifactId>
+        <version>${jersey.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>javax.ws.rs</groupId>
+        <artifactId>javax.ws.rs-api</artifactId>
+        <version>2.0.1</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
   <profiles>

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-pullserver/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-pullserver/pom.xml b/tajo-pullserver/pom.xml
index 944cf3d..ba6e6b7 100644
--- a/tajo-pullserver/pom.xml
+++ b/tajo-pullserver/pom.xml
@@ -56,7 +56,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-rpc</artifactId>
+      <artifactId>tajo-rpc-protobuf</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-rpc/pom.xml b/tajo-rpc/pom.xml
index 8c626b4..f069aca 100644
--- a/tajo-rpc/pom.xml
+++ b/tajo-rpc/pom.xml
@@ -24,165 +24,39 @@
     <groupId>org.apache.tajo</groupId>
     <relativePath>../tajo-project</relativePath>
   </parent>
-  <packaging>jar</packaging>
   <artifactId>tajo-rpc</artifactId>
-  <name>Tajo Rpc</name>
-  <description>RPC Server/Client Implementation based on Netty and Protocol Buffer</description>
+  <packaging>pom</packaging>
+  <name>Tajo RPC</name>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+  </properties>
+
+  <modules>
+    <module>tajo-rpc-common</module>
+    <module>tajo-rpc-protobuf</module>
+    <module>tajo-ws-rs</module>
+  </modules>
 
   <build>
     <plugins>
       <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <configuration>
-          <source>1.6</source>
-          <target>1.6</target>
-          <encoding>${project.build.sourceEncoding}</encoding>
-        </configuration>
-      </plugin>
-      <plugin>
         <groupId>org.apache.rat</groupId>
         <artifactId>apache-rat-plugin</artifactId>
-        <executions>
-          <execution>
-            <phase>verify</phase>
-            <goals>
-              <goal>check</goal>
-            </goals>
-          </execution>
-        </executions>
       </plugin>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-jar-plugin</artifactId>
-        <version>2.4</version>
-        <configuration>
-        </configuration>
-        <executions>
-          <execution>
-            <id>create-jar</id>
-            <phase>prepare-package</phase>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-antrun-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>create-protobuf-generated-sources-directory</id>
-            <phase>initialize</phase>
-            <configuration>
-              <target>
-                <mkdir dir="target/generated-sources/proto" />
-              </target>
-            </configuration>
-            <goals>
-              <goal>run</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>exec-maven-plugin</artifactId>
-        <version>1.2</version>
-        <executions>
-          <execution>
-            <id>generate-sources</id>
-            <phase>generate-sources</phase>
-            <configuration>
-              <executable>protoc</executable>
-              <arguments>
-                <argument>-Isrc/main/proto/</argument>
-                <argument>--java_out=target/generated-sources/proto</argument>
-                <argument>src/main/proto/DummyProtos.proto</argument>
-                <argument>src/main/proto/RpcProtos.proto</argument>
-                <argument>src/main/proto/TestProtos.proto</argument>
-                <argument>src/main/proto/TestProtocol.proto</argument>
-              </arguments>
-            </configuration>
-            <goals>
-              <goal>exec</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>build-helper-maven-plugin</artifactId>
-        <version>1.5</version>
-        <executions>
-          <execution>
-            <id>add-source</id>
-            <phase>generate-sources</phase>
-            <goals>
-              <goal>add-source</goal>
-            </goals>
-            <configuration>
-              <sources>
-                <source>target/generated-sources/proto</source>
-              </sources>
-            </configuration>
-          </execution>
-        </executions>
+        <artifactId>maven-surefire-report-plugin</artifactId>
       </plugin>
       <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-report-plugin</artifactId>
-        <version>2.15</version>
+        <artifactId>maven-deploy-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
       </plugin>
     </plugins>
   </build>
 
-  <dependencies>
-    <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty-transport</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty-codec</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty-handler</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>commons-logging</groupId>
-      <artifactId>commons-logging</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>commons-lang</groupId>
-      <artifactId>commons-lang</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.google.protobuf</groupId>
-      <artifactId>protobuf-java</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-  </properties>
-  <repositories>
-    <repository>
-      <id>repository.jboss.org</id>
-      <url>https://repository.jboss.org/nexus/content/repositories/releases/
-			</url>
-      <snapshots>
-        <enabled>false</enabled>
-      </snapshots>
-    </repository>
-  </repositories>
 
   <profiles>
     <profile>
@@ -216,6 +90,9 @@
       <id>dist</id>
       <activation>
         <activeByDefault>false</activeByDefault>
+        <property>
+          <name>tar|rpm|deb</name>
+        </property>
       </activation>
       <build>
         <plugins>
@@ -225,7 +102,7 @@
             <executions>
               <execution>
                 <id>dist</id>
-                <phase>package</phase>
+                <phase>prepare-package</phase>
                 <goals>
                   <goal>run</goal>
                 </goals>
@@ -248,12 +125,15 @@
                       echo
                       echo "Current directory `pwd`"
                       echo
-                      run rm -rf ${project.artifactId}-${project.version}
-                      run mkdir ${project.artifactId}-${project.version}
-                      run cd ${project.artifactId}-${project.version}
-                      run cp -r ${basedir}/target/${project.artifactId}-${project.version}*.jar .
+                      run rm -rf tajo-rpc-${project.version}
+                      run mkdir tajo-rpc-${project.version}
+                      run cd tajo-rpc-${project.version}
+                      run cp -r ${basedir}/tajo-rpc-common/target/tajo-rpc-common-${project.version}*.jar .
+                      run cp -r ${basedir}/tajo-rpc-protobuf/target/tajo-rpc-protobuf-${project.version}*.jar .
+                      run cp -r ${basedir}/tajo-ws-rs/target/tajo-ws-rs-${project.version}*.jar .
+
                       echo
-                      echo "Tajo Rpc dist layout available at: ${project.build.directory}/${project.artifactId}-${project.version}"
+                      echo "Tajo RPC dist layout available at: ${project.build.directory}/tajo-rpc-${project.version}"
                       echo
                     </echo>
                     <exec executable="sh" dir="${project.build.directory}" failonerror="true">
@@ -274,9 +154,9 @@
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-surefire-report-plugin</artifactId>
-        <version>2.15</version>
       </plugin>
     </plugins>
   </reporting>
 
 </project>
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
deleted file mode 100644
index 3d856ce..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.*;
-
-import io.netty.channel.*;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
-import org.apache.tajo.rpc.RpcProtos.RpcRequest;
-import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-
-import io.netty.util.ReferenceCountUtil;
-import io.netty.util.concurrent.GenericFutureListener;
-
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class AsyncRpcClient extends NettyClientBase {
-  private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
-
-  private final ConcurrentMap<Integer, ResponseCallback> requests =
-      new ConcurrentHashMap<Integer, ResponseCallback>();
-
-  private final Method stubMethod;
-  private final ProxyRpcChannel rpcChannel;
-  private final ClientChannelInboundHandler inboundHandler;
-
-  /**
-   * Intentionally make this method package-private, avoiding user directly
-   * new an instance through this constructor.
-   */
-  AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
-      throws ClassNotFoundException, NoSuchMethodException {
-    super(rpcConnectionKey, retries);
-    stubMethod = getServiceClass().getMethod("newStub", RpcChannel.class);
-    rpcChannel = new ProxyRpcChannel();
-    inboundHandler = new ClientChannelInboundHandler();
-    init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance()));
-  }
-
-  @Override
-  public <T> T getStub() {
-    return getStub(stubMethod, rpcChannel);
-  }
-
-  protected void sendExceptions(String message) {
-    for(Map.Entry<Integer, ResponseCallback> callbackEntry: requests.entrySet()) {
-      ResponseCallback callback = callbackEntry.getValue();
-      Integer id = callbackEntry.getKey();
-
-      RpcResponse.Builder responseBuilder = RpcResponse.newBuilder()
-          .setErrorMessage(message)
-          .setId(id);
-
-      callback.run(responseBuilder.build());
-    }
-  }
-
-  @Override
-  public void close() {
-    sendExceptions("AsyncRpcClient terminates all the connections");
-
-    super.close();
-  }
-
-  private class ProxyRpcChannel implements RpcChannel {
-
-    public void callMethod(final MethodDescriptor method,
-                           final RpcController controller,
-                           final Message param,
-                           final Message responseType,
-                           RpcCallback<Message> done) {
-
-      int nextSeqId = sequence.getAndIncrement();
-
-      Message rpcRequest = buildRequest(nextSeqId, method, param);
-
-      inboundHandler.registerCallback(nextSeqId,
-          new ResponseCallback(controller, responseType, done));
-
-      ChannelPromise channelPromise = getChannel().newPromise();
-      channelPromise.addListener(new GenericFutureListener<ChannelFuture>() {
-
-        @Override
-        public void operationComplete(ChannelFuture future) throws Exception {
-          if (!future.isSuccess()) {
-            inboundHandler.exceptionCaught(null, new ServiceException(future.cause()));
-          }
-        }
-      });
-      getChannel().writeAndFlush(rpcRequest, channelPromise);
-    }
-
-    private Message buildRequest(int seqId,
-                                 MethodDescriptor method,
-                                 Message param) {
-
-      RpcRequest.Builder requestBuilder = RpcRequest.newBuilder()
-          .setId(seqId)
-          .setMethodName(method.getName());
-
-      if (param != null) {
-          requestBuilder.setRequestMessage(param.toByteString());
-      }
-
-      return requestBuilder.build();
-    }
-  }
-
-  private class ResponseCallback implements RpcCallback<RpcResponse> {
-    private final RpcController controller;
-    private final Message responsePrototype;
-    private final RpcCallback<Message> callback;
-
-    public ResponseCallback(RpcController controller,
-                            Message responsePrototype,
-                            RpcCallback<Message> callback) {
-      this.controller = controller;
-      this.responsePrototype = responsePrototype;
-      this.callback = callback;
-    }
-
-    @Override
-    public void run(RpcResponse rpcResponse) {
-      // if hasErrorMessage is true, it means rpc-level errors.
-      // it does not call the callback function\
-      if (rpcResponse.hasErrorMessage()) {
-        if (controller != null) {
-          this.controller.setFailed(rpcResponse.getErrorMessage());
-        }
-        callback.run(null);
-      } else { // if rpc call succeed
-        try {
-          Message responseMessage;
-          if (!rpcResponse.hasResponseMessage()) {
-            responseMessage = null;
-          } else {
-            responseMessage = responsePrototype.newBuilderForType().mergeFrom(
-                rpcResponse.getResponseMessage()).build();
-          }
-
-          callback.run(responseMessage);
-
-        } catch (InvalidProtocolBufferException e) {
-          throw new RemoteException(getErrorMessage(""), e);
-        }
-      }
-    }
-  }
-
-  private String getErrorMessage(String message) {
-    return "Exception [" + protocol.getCanonicalName() +
-        "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
-        getChannel().remoteAddress()) + ")]: " + message;
-  }
-
-  @ChannelHandler.Sharable
-  private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter {
-
-    void registerCallback(int seqId, ResponseCallback callback) {
-
-      if (requests.putIfAbsent(seqId, callback) != null) {
-        throw new RemoteException(
-            getErrorMessage("Duplicate Sequence Id "+ seqId));
-      }
-    }
-
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg)
-        throws Exception {
-      if (msg instanceof RpcResponse) {
-        try {
-          RpcResponse response = (RpcResponse) msg;
-          ResponseCallback callback = requests.remove(response.getId());
-
-          if (callback == null) {
-            LOG.warn("Dangling rpc call");
-          } else {
-            callback.run(response);
-          }
-        } finally {
-          ReferenceCountUtil.release(msg);
-        }
-      }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-        throws Exception {
-      LOG.error(getRemoteAddress() + "," + protocol + "," + cause.getMessage(), cause);
-
-      sendExceptions(cause.getMessage());
-      
-      if(LOG.isDebugEnabled()) {
-        LOG.error(cause.getMessage(), cause);
-      } else {
-        LOG.error("RPC Exception:" + cause.getMessage());
-      }
-      
-      if (ctx != null && ctx.channel().isActive()) {
-        ctx.channel().close();
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
deleted file mode 100644
index 3b5a747..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.*;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-
-import io.netty.channel.*;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.RpcProtos.RpcRequest;
-import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-
-import io.netty.util.ReferenceCountUtil;
-
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-
-public class AsyncRpcServer extends NettyServerBase {
-  private static final Log LOG = LogFactory.getLog(AsyncRpcServer.class);
-
-  private final Service service;
-  private final ChannelInitializer<Channel> initializer;
-
-  public AsyncRpcServer(final Class<?> protocol,
-                        final Object instance,
-                        final InetSocketAddress bindAddress,
-                        final int workerNum)
-      throws Exception {
-    super(protocol.getSimpleName(), bindAddress);
-
-    String serviceClassName = protocol.getName() + "$" +
-        protocol.getSimpleName() + "Service";
-    Class<?> serviceClass = Class.forName(serviceClassName);
-    Class<?> interfaceClass = Class.forName(serviceClassName + "$Interface");
-    Method method = serviceClass.getMethod("newReflectiveService", interfaceClass);
-    this.service = (Service) method.invoke(null, instance);
-
-    this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance());
-    super.init(this.initializer, workerNum);
-  }
-
-  @ChannelHandler.Sharable
-  private class ServerHandler extends ChannelInboundHandlerAdapter {
-
-    @Override
-    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
-      accepted.add(ctx.channel());
-      if(LOG.isDebugEnabled()){
-        LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size()));
-      }
-      super.channelRegistered(ctx);
-    }
-
-    @Override
-    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
-      accepted.remove(ctx.channel());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(serviceName + " closes a connection. The number of current connections are " + accepted.size());
-      }
-      super.channelUnregistered(ctx);
-    }
-
-    @Override
-    public void channelRead(final ChannelHandlerContext ctx, Object msg)
-        throws Exception {
-      if (msg instanceof RpcRequest) {
-        try {
-          final RpcRequest request = (RpcRequest) msg;
-
-          String methodName = request.getMethodName();
-          MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
-
-          if (methodDescriptor == null) {
-            throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName));
-          }
-
-          Message paramProto = null;
-          if (request.hasRequestMessage()) {
-            try {
-              paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType()
-                  .mergeFrom(request.getRequestMessage()).build();
-            } catch (Throwable t) {
-              throw new RemoteCallException(request.getId(), methodDescriptor, t);
-            }
-          }
-
-          final RpcController controller = new NettyRpcController();
-
-          RpcCallback<Message> callback = !request.hasId() ? null : new RpcCallback<Message>() {
-
-            public void run(Message returnValue) {
-
-              RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId());
-
-              if (returnValue != null) {
-                builder.setResponseMessage(returnValue.toByteString());
-              }
-
-              if (controller.failed()) {
-                builder.setErrorMessage(controller.errorText());
-              }
-
-              ctx.writeAndFlush(builder.build());
-            }
-          };
-
-          service.callMethod(methodDescriptor, controller, paramProto, callback);
-
-        } finally {
-          ReferenceCountUtil.release(msg);
-        }
-      }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-        throws Exception{
-      if (cause instanceof RemoteCallException) {
-        RemoteCallException callException = (RemoteCallException) cause;
-        ctx.writeAndFlush(callException.getResponse());
-      } else {
-        LOG.error(cause.getMessage());
-      }
-      
-      if (ctx != null && ctx.channel().isActive()) {
-        ctx.channel().close();
-      }
-    }
-    
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
deleted file mode 100644
index 6a90330..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.*;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-
-import io.netty.channel.*;
-import io.netty.util.concurrent.*;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
-import org.apache.tajo.rpc.RpcProtos.RpcRequest;
-import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-
-import io.netty.util.ReferenceCountUtil;
-
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.*;
-import java.util.concurrent.Future;
-
-public class BlockingRpcClient extends NettyClientBase {
-  private static final Log LOG = LogFactory.getLog(RpcProtos.class);
-
-  private final Map<Integer, ProtoCallFuture> requests =
-      new ConcurrentHashMap<Integer, ProtoCallFuture>();
-
-  private final Method stubMethod;
-  private final ProxyRpcChannel rpcChannel;
-  private final ChannelInboundHandlerAdapter inboundHandler;
-
-  /**
-   * Intentionally make this method package-private, avoiding user directly
-   * new an instance through this constructor.
-   */
-  BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
-      throws ClassNotFoundException, NoSuchMethodException {
-    super(rpcConnectionKey, retries);
-    stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class);
-    rpcChannel = new ProxyRpcChannel();
-    inboundHandler = new ClientChannelInboundHandler();
-    init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance()));
-  }
-
-  @Override
-  public <T> T getStub() {
-    return getStub(stubMethod, rpcChannel);
-  }
-
-  @Override
-  public void close() {
-    for(ProtoCallFuture callback: requests.values()) {
-      callback.setFailed("BlockingRpcClient terminates all the connections",
-          new ServiceException("BlockingRpcClient terminates all the connections"));
-    }
-
-    super.close();
-  }
-
-  private class ProxyRpcChannel implements BlockingRpcChannel {
-
-    @Override
-    public Message callBlockingMethod(final MethodDescriptor method,
-                                      final RpcController controller,
-                                      final Message param,
-                                      final Message responsePrototype)
-        throws TajoServiceException {
-
-      int nextSeqId = sequence.getAndIncrement();
-
-      Message rpcRequest = buildRequest(nextSeqId, method, param);
-
-      ProtoCallFuture callFuture =
-          new ProtoCallFuture(controller, responsePrototype);
-      requests.put(nextSeqId, callFuture);
-
-      ChannelPromise channelPromise = getChannel().newPromise();
-      channelPromise.addListener(new GenericFutureListener<ChannelFuture>() {
-        @Override
-        public void operationComplete(ChannelFuture future) throws Exception {
-          if (!future.isSuccess()) {
-            inboundHandler.exceptionCaught(null, new ServiceException(future.cause()));
-          }
-        }
-      });
-      getChannel().writeAndFlush(rpcRequest, channelPromise);
-
-      try {
-        return callFuture.get(60, TimeUnit.SECONDS);
-      } catch (Throwable t) {
-        if (t instanceof ExecutionException) {
-          Throwable cause = t.getCause();
-          if (cause != null && cause instanceof TajoServiceException) {
-            throw (TajoServiceException)cause;
-          }
-        }
-        throw new TajoServiceException(t.getMessage());
-      }
-    }
-
-    private Message buildRequest(int seqId,
-                                 MethodDescriptor method,
-                                 Message param) {
-      RpcRequest.Builder requestBuilder = RpcRequest.newBuilder()
-          .setId(seqId)
-          .setMethodName(method.getName());
-
-      if (param != null) {
-        requestBuilder.setRequestMessage(param.toByteString());
-      }
-
-      return requestBuilder.build();
-    }
-  }
-
-  private String getErrorMessage(String message) {
-    if(getChannel() != null) {
-      return protocol.getName() +
-          "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
-          getChannel().remoteAddress()) + "): " + message;
-    } else {
-      return "Exception " + message;
-    }
-  }
-
-  private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) {
-    if(getChannel() != null) {
-      return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(),
-          RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().remoteAddress()));
-    } else {
-      return new TajoServiceException(response.getErrorMessage());
-    }
-  }
-
-  @ChannelHandler.Sharable
-  private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter {
-
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg)
-        throws Exception {
-
-      if (msg instanceof RpcResponse) {
-        try {
-          RpcResponse rpcResponse = (RpcResponse) msg;
-          ProtoCallFuture callback = requests.remove(rpcResponse.getId());
-
-          if (callback == null) {
-            LOG.warn("Dangling rpc call");
-          } else {
-            if (rpcResponse.hasErrorMessage()) {
-              callback.setFailed(rpcResponse.getErrorMessage(),
-                  makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace())));
-              throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage()));
-            } else {
-              Message responseMessage;
-
-              if (!rpcResponse.hasResponseMessage()) {
-                responseMessage = null;
-              } else {
-                responseMessage = callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage())
-                    .build();
-              }
-
-              callback.setResponse(responseMessage);
-            }
-          }
-        } finally {
-          ReferenceCountUtil.release(msg);
-        }
-      }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-        throws Exception {
-      for(ProtoCallFuture callback: requests.values()) {
-        callback.setFailed(cause.getMessage(), cause);
-      }
-      
-      if(LOG.isDebugEnabled()) {
-        LOG.error("" + cause.getMessage(), cause);
-      } else {
-        LOG.error("RPC Exception:" + cause.getMessage());
-      }
-      if (ctx != null && ctx.channel().isActive()) {
-        ctx.channel().close();
-      }
-    }
-  }
-
- static class ProtoCallFuture implements Future<Message> {
-    private Semaphore sem = new Semaphore(0);
-    private Message response = null;
-    private Message returnType;
-
-    private RpcController controller;
-
-    private ExecutionException ee;
-
-    public ProtoCallFuture(RpcController controller, Message message) {
-      this.controller = controller;
-      this.returnType = message;
-    }
-
-    @Override
-    public boolean cancel(boolean arg0) {
-      return false;
-    }
-
-    @Override
-    public Message get() throws InterruptedException, ExecutionException {
-      sem.acquire();
-      if(ee != null) {
-        throw ee;
-      }
-      return response;
-    }
-
-    @Override
-    public Message get(long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-      if(sem.tryAcquire(timeout, unit)) {
-        if (ee != null) {
-          throw ee;
-        }
-        return response;
-      } else {
-        throw new TimeoutException();
-      }
-    }
-
-    @Override
-    public boolean isCancelled() {
-      return false;
-    }
-
-    @Override
-    public boolean isDone() {
-      return sem.availablePermits() > 0;
-    }
-
-    public void setResponse(Message response) {
-      this.response = response;
-      sem.release();
-    }
-
-    public void setFailed(String errorText, Throwable t) {
-      if(controller != null) {
-        this.controller.setFailed(errorText);
-      }
-      ee = new ExecutionException(errorText, t);
-      sem.release();
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
deleted file mode 100644
index 0ce359f..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.BlockingService;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-
-import io.netty.channel.*;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.RpcProtos.RpcRequest;
-import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-
-import io.netty.util.ReferenceCountUtil;
-
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-
-public class BlockingRpcServer extends NettyServerBase {
-  private static Log LOG = LogFactory.getLog(BlockingRpcServer.class);
-  private final BlockingService service;
-  private final ChannelInitializer<Channel> initializer;
-
-  public BlockingRpcServer(final Class<?> protocol,
-                           final Object instance,
-                           final InetSocketAddress bindAddress,
-                           final int workerNum)
-      throws Exception {
-
-    super(protocol.getSimpleName(), bindAddress);
-
-    String serviceClassName = protocol.getName() + "$" +
-        protocol.getSimpleName() + "Service";
-    Class<?> serviceClass = Class.forName(serviceClassName);
-    Class<?> interfaceClass = Class.forName(serviceClassName +
-        "$BlockingInterface");
-    Method method = serviceClass.getMethod(
-        "newReflectiveBlockingService", interfaceClass);
-
-    this.service = (BlockingService) method.invoke(null, instance);
-    this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance());
-
-    super.init(this.initializer, workerNum);
-  }
-
-  @ChannelHandler.Sharable
-  private class ServerHandler extends ChannelInboundHandlerAdapter {
-
-    @Override
-    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
-      accepted.add(ctx.channel());
-      if(LOG.isDebugEnabled()){
-        LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size()));
-      }
-      super.channelRegistered(ctx);
-    }
-
-    @Override
-    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
-      accepted.remove(ctx.channel());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(serviceName + " closes a connection. The number of current connections are " + accepted.size());
-      }
-      super.channelUnregistered(ctx);
-    }
-
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg)
-        throws Exception {
-
-      if (msg instanceof RpcRequest) {
-        try {
-          final RpcRequest request = (RpcRequest) msg;
-
-          String methodName = request.getMethodName();
-          MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
-
-          if (methodDescriptor == null) {
-            throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName));
-          }
-          Message paramProto = null;
-          if (request.hasRequestMessage()) {
-            try {
-              paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType()
-                  .mergeFrom(request.getRequestMessage()).build();
-
-            } catch (Throwable t) {
-              throw new RemoteCallException(request.getId(), methodDescriptor, t);
-            }
-          }
-          Message returnValue;
-          RpcController controller = new NettyRpcController();
-
-          try {
-            returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto);
-          } catch (Throwable t) {
-            throw new RemoteCallException(request.getId(), methodDescriptor, t);
-          }
-
-          RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId());
-
-          if (returnValue != null) {
-            builder.setResponseMessage(returnValue.toByteString());
-          }
-
-          if (controller.failed()) {
-            builder.setErrorMessage(controller.errorText());
-          }
-          ctx.writeAndFlush(builder.build());
-        } finally {
-          ReferenceCountUtil.release(msg);
-        }
-      }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-      if (cause instanceof RemoteCallException) {
-        RemoteCallException callException = (RemoteCallException) cause;
-        ctx.writeAndFlush(callException.getResponse());
-      }
-      
-      if (ctx != null && ctx.channel().isActive()) {
-        ctx.channel().close();
-      }
-    }
-    
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
deleted file mode 100644
index c4c3256..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-public class CallFuture<T> implements RpcCallback<T>, Future<T> {
-
-  private final Semaphore sem = new Semaphore(0);
-  private boolean done = false;
-  private T response;
-  private RpcController controller;
-
-  public CallFuture() {
-    controller = new DefaultRpcController();
-  }
-
-  public RpcController getController() {
-    return controller;
-  }
-
-  @Override
-  public void run(T t) {
-    this.response = t;
-    done = true;
-    sem.release();
-  }
-
-  @Override
-  public boolean cancel(boolean mayInterruptIfRunning) {
-    controller.startCancel();
-    sem.release();
-    return controller.isCanceled();
-  }
-
-  @Override
-  public boolean isCancelled() {
-    return controller.isCanceled();
-  }
-
-  @Override
-  public boolean isDone() {
-    return done;
-  }
-
-  @Override
-  public T get() throws InterruptedException {
-    sem.acquire();
-
-    return response;
-  }
-
-  @Override
-  public T get(long timeout, TimeUnit unit)
-      throws InterruptedException, TimeoutException {
-    if (sem.tryAcquire(timeout, unit)) {
-      return response;
-    } else {
-      throw new TimeoutException();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
deleted file mode 100644
index 4ba19a5..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-
-public class DefaultRpcController implements RpcController {
-  private String errorText;
-  private boolean error;
-  private boolean canceled;
-
-  @Override
-  public void reset() {
-    errorText = "";
-    error = false;
-    canceled = false;
-  }
-
-  @Override
-  public boolean failed() {
-    return error;
-  }
-
-  @Override
-  public String errorText() {
-    return errorText;
-  }
-
-  @Override
-  public void startCancel() {
-    this.canceled = true;
-  }
-
-  @Override
-  public void setFailed(String s) {
-    this.errorText = s;
-    this.error = true;
-  }
-
-  @Override
-  public boolean isCanceled() {
-    return canceled;
-  }
-
-  @Override
-  public void notifyOnCancel(RpcCallback<Object> objectRpcCallback) {
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
deleted file mode 100644
index 72278f2..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import io.netty.channel.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.concurrent.GenericFutureListener;
-
-import java.io.Closeable;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public abstract class NettyClientBase implements Closeable {
-  private static final Log LOG = LogFactory.getLog(NettyClientBase.class);
-  private static final int CONNECTION_TIMEOUT = 60000;  // 60 sec
-  private static final long PAUSE = 1000; // 1 sec
-
-  private final int numRetries;
-
-  private Bootstrap bootstrap;
-  private volatile ChannelFuture channelFuture;
-
-  protected final Class<?> protocol;
-  protected final AtomicInteger sequence = new AtomicInteger(0);
-
-  private final RpcConnectionKey key;
-  private final AtomicInteger counter = new AtomicInteger(0);   // reference counter
-
-  public NettyClientBase(RpcConnectionKey rpcConnectionKey, int numRetries)
-      throws ClassNotFoundException, NoSuchMethodException {
-    this.key = rpcConnectionKey;
-    this.protocol = rpcConnectionKey.protocolClass;
-    this.numRetries = numRetries;
-  }
-
-  // should be called from sub class
-  protected void init(ChannelInitializer<Channel> initializer) {
-    this.bootstrap = new Bootstrap();
-    this.bootstrap
-      .channel(NioSocketChannel.class)
-      .handler(initializer)
-      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-      .option(ChannelOption.SO_REUSEADDR, true)
-      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECTION_TIMEOUT)
-      .option(ChannelOption.SO_RCVBUF, 1048576 * 10)
-      .option(ChannelOption.TCP_NODELAY, true);
-  }
-
-  public RpcConnectionPool.RpcConnectionKey getKey() {
-    return key;
-  }
-
-  protected final Class<?> getServiceClass() throws ClassNotFoundException {
-    String serviceClassName = protocol.getName() + "$" + protocol.getSimpleName() + "Service";
-    return Class.forName(serviceClassName);
-  }
-
-  @SuppressWarnings("unchecked")
-  protected final <T> T getStub(Method stubMethod, Object rpcChannel) {
-    try {
-      return (T) stubMethod.invoke(null, rpcChannel);
-    } catch (Exception e) {
-      throw new RemoteException(e.getMessage(), e);
-    }
-  }
-
-  public abstract <T> T getStub();
-
-  public boolean acquire(long timeout) {
-    if (!checkConnection(timeout)) {
-      return false;
-    }
-    counter.incrementAndGet();
-    return true;
-  }
-
-  public boolean release() {
-    return counter.decrementAndGet() == 0;
-  }
-
-  private boolean checkConnection(long timeout) {
-    if (isConnected()) {
-      return true;
-    }
-
-    InetSocketAddress addr = key.addr;
-    if (addr.isUnresolved()) {
-      addr = RpcUtils.createSocketAddr(addr.getHostName(), addr.getPort());
-    }
-
-    return handleConnectionInternally(addr, timeout);
-  }
-
-  private void connectUsingNetty(InetSocketAddress address, GenericFutureListener<ChannelFuture> listener) {
-    LOG.warn("Try to connect : " + address);
-    this.channelFuture = bootstrap.clone().group(RpcChannelFactory.getSharedClientEventloopGroup())
-            .connect(address)
-            .addListener(listener);
-  }
-
-  // first attendant kicks connection
-  private final RpcUtils.Scrutineer<CountDownLatch> connect = new RpcUtils.Scrutineer<CountDownLatch>();
-
-  private boolean handleConnectionInternally(final InetSocketAddress addr, long timeout) {
-    final CountDownLatch ticket = new CountDownLatch(1);
-    final CountDownLatch granted = connect.check(ticket);
-
-    // basically, it's double checked lock
-    if (ticket == granted && isConnected()) {
-      granted.countDown();
-      return true;
-    }
-
-    if (ticket == granted) {
-      connectUsingNetty(addr, new RetryConnectionListener(addr, granted));
-    }
-
-    try {
-      granted.await(timeout, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      // ignore
-    }
-
-    boolean success = channelFuture.isSuccess();
-
-    if (granted.getCount() == 0) {
-      connect.clear(granted);
-    }
-
-    return success;
-  }
-
-  class RetryConnectionListener implements GenericFutureListener<ChannelFuture> {
-    private final AtomicInteger retryCount = new AtomicInteger();
-    private final InetSocketAddress address;
-    private final CountDownLatch latch;
-
-    RetryConnectionListener(InetSocketAddress address, CountDownLatch latch) {
-      this.address = address;
-      this.latch = latch;
-    }
-
-    @Override
-    public void operationComplete(ChannelFuture channelFuture) throws Exception {
-      if (!channelFuture.isSuccess()) {
-        channelFuture.channel().close();
-
-        if (numRetries > retryCount.getAndIncrement()) {
-          final GenericFutureListener<ChannelFuture> currentListener = this;
-
-          RpcChannelFactory.getSharedClientEventloopGroup().schedule(new Runnable() {
-            @Override
-            public void run() {
-              connectUsingNetty(address, currentListener);
-            }
-          }, PAUSE, TimeUnit.MILLISECONDS);
-
-          LOG.debug("Connecting to " + address + " has been failed. Retrying to connect.");
-        }
-        else {
-          latch.countDown();
-
-          LOG.error("Max retry count has been exceeded. attempts=" + numRetries);
-        }
-      }
-      else {
-        latch.countDown();
-      }
-    }
-  }
-
-  public Channel getChannel() {
-    return channelFuture == null ? null : channelFuture.channel();
-  }
-
-  public boolean isConnected() {
-    Channel channel = getChannel();
-    return channel != null && channel.isOpen() && channel.isActive();
-  }
-
-  public SocketAddress getRemoteAddress() {
-    Channel channel = getChannel();
-    return channel == null ? null : channel.remoteAddress();
-  }
-
-  @Override
-  public void close() {
-    Channel channel = getChannel();
-    if (channel != null && channel.isOpen()) {
-      LOG.debug("Proxy will be disconnected from remote " + channel.remoteAddress());
-      channel.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpcController.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpcController.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpcController.java
deleted file mode 100644
index b7f4537..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpcController.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-
-public class NettyRpcController implements RpcController {
-  private String errorText;
-
-  @Override
-  public void reset() {
-    errorText = null;
-  }
-
-  @Override
-  public boolean failed() {
-    return errorText != null;
-  }
-
-  @Override
-  public String errorText() {
-    return errorText;
-  }
-
-  @Override
-  public void startCancel() {
-    // TODO - to be implemented
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void setFailed(String s) {
-    errorText = s;
-  }
-
-  @Override
-  public boolean isCanceled() {
-    // TODO - to be implemented
-    return false;
-  }
-
-  @Override
-  public void notifyOnCancel(RpcCallback<Object> objectRpcCallback) {
-    throw new UnsupportedOperationException();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
deleted file mode 100644
index 024108b..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.group.ChannelGroup;
-import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.util.concurrent.GlobalEventExecutor;
-
-import java.io.IOException;
-import java.net.DatagramSocket;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class NettyServerBase {
-  private static final Log LOG = LogFactory.getLog(NettyServerBase.class);
-  private static final String DEFAULT_PREFIX = "RpcServer_";
-  private static final AtomicInteger sequenceId = new AtomicInteger(0);
-
-  protected String serviceName;
-  protected InetSocketAddress serverAddr;
-  protected InetSocketAddress bindAddress;
-  protected ChannelInitializer<Channel> initializer;
-  protected ServerBootstrap bootstrap;
-  protected ChannelFuture channelFuture;
-  protected ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
-
-  private InetSocketAddress initIsa;
-
-  public NettyServerBase(InetSocketAddress address) {
-    this.initIsa = address;
-  }
-
-  public NettyServerBase(String serviceName, InetSocketAddress addr) {
-    this.serviceName = serviceName;
-    this.initIsa = addr;
-  }
-
-  public void setName(String name) {
-    this.serviceName = name;
-  }
-
-  public void init(ChannelInitializer<Channel> initializer, int workerNum) {
-    bootstrap = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum);
-
-    this.initializer = initializer;
-    bootstrap
-      .channel(NioServerSocketChannel.class)
-      .childHandler(initializer)
-      .option(ChannelOption.SO_REUSEADDR, true)
-      .option(ChannelOption.TCP_NODELAY, true)
-      .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-      .childOption(ChannelOption.TCP_NODELAY, true)
-      .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
-      .childOption(ChannelOption.SO_RCVBUF, 1048576 * 10);
-  }
-
-  public InetSocketAddress getListenAddress() {
-    return this.bindAddress;
-  }
-
-  public void start() {
-    if (serviceName == null) {
-      this.serviceName = getNextDefaultServiceName();
-    }
-
-    if (initIsa.getPort() == 0) {
-      try {
-        int port = getUnusedPort();
-        serverAddr = new InetSocketAddress(initIsa.getHostName(), port);
-      } catch (IOException e) {
-        LOG.error(e, e);
-      }
-    } else {
-      serverAddr = initIsa;
-    }
-
-    this.channelFuture = bootstrap.clone().bind(serverAddr).syncUninterruptibly();
-    this.bindAddress = (InetSocketAddress) channelFuture.channel().localAddress();
-
-    LOG.info("Rpc (" + serviceName + ") listens on " + this.bindAddress);
-  }
-
-  public Channel getChannel() {
-    return this.channelFuture.channel();
-  }
-
-  public void shutdown() {
-    shutdown(false);
-  }
-
-  public void shutdown(boolean waitUntilThreadsStop) {
-    try {
-      accepted.close();
-    } catch (Throwable t) {
-      LOG.error(t.getMessage(), t);
-    }
-
-    if(bootstrap != null) {
-      if (bootstrap.childGroup() != null) {
-        bootstrap.childGroup().shutdownGracefully();
-        if (waitUntilThreadsStop) {
-          bootstrap.childGroup().terminationFuture().awaitUninterruptibly();
-        }
-      }
-
-      if (bootstrap.group() != null) {
-        bootstrap.group().shutdownGracefully();
-        if (waitUntilThreadsStop) {
-          bootstrap.childGroup().terminationFuture().awaitUninterruptibly();
-        }
-      }
-    }
-
-    if (bindAddress != null) {
-      LOG.info("Rpc (" + serviceName + ") listened on "
-          + RpcUtils.normalizeInetSocketAddress(bindAddress)+ ") shutdown");
-    }
-  }
-
-  private static String getNextDefaultServiceName() {
-    return DEFAULT_PREFIX + sequenceId.getAndIncrement();
-  }
-
-  private static final int startPortRange = 10000;
-  private static final int endPortRange = 50000;
-  private static final Random rnd = new Random(System.currentTimeMillis());
-  // each system has a different starting port number within the given range.
-  private static final AtomicInteger nextPortNum =
-      new AtomicInteger(startPortRange+ rnd.nextInt(endPortRange - startPortRange));
-  private static final Object lockObject = new Object();
-
-
-  private synchronized static int getUnusedPort() throws IOException {
-    while (true) {
-      int port = nextPortNum.getAndIncrement();
-      if (port >= endPortRange) {
-        synchronized (lockObject) {
-          nextPortNum.set(startPortRange);
-          port = nextPortNum.getAndIncrement();
-        }
-      }
-      if (available(port)) {
-        return port;
-      }
-    }
-  }
-
-  private static boolean available(int port) throws IOException {
-    if (port < 1024 || port > 65535) {
-      throw new IllegalArgumentException("Port Number Out of Bound: " + port);
-    }
-
-    ServerSocket ss = null;
-    DatagramSocket ds = null;
-
-    try {
-      ss = new ServerSocket(port);
-      ss.setReuseAddress(true);
-
-      ds = new DatagramSocket(port);
-      ds.setReuseAddress(true);
-
-      return true;
-
-    } catch (IOException e) {
-      return false;
-    } finally {
-      if (ss != null) {
-        ss.close();
-      }
-
-      if (ds != null) {
-        ds.close();
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java
deleted file mode 100644
index 9b7f8ac..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.RpcCallback;
-
-public class NullCallback implements RpcCallback<Object> {
-  private final static NullCallback instance;
-
-  static {
-    instance = new NullCallback();
-  }
-
-  public static RpcCallback get() {
-    return instance;
-  }
-
-  @Override
-  public void run(Object parameter) {
-    // do nothing
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
deleted file mode 100644
index 6a340dc..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.handler.codec.protobuf.ProtobufDecoder;
-import io.netty.handler.codec.protobuf.ProtobufEncoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
-
-import com.google.protobuf.MessageLite;
-
-class ProtoChannelInitializer extends ChannelInitializer<Channel> {
-  private final MessageLite defaultInstance;
-  private final ChannelHandler handler;
-
-  public ProtoChannelInitializer(ChannelHandler handler, MessageLite defaultInstance) {
-    this.handler = handler;
-    this.defaultInstance = defaultInstance;
-  }
-
-  @Override
-  protected void initChannel(Channel channel) throws Exception {
-    ChannelPipeline pipeline = channel.pipeline();
-    pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
-    pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
-    pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
-    pipeline.addLast("protobufEncoder", new ProtobufEncoder());
-    pipeline.addLast("handler", handler);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
deleted file mode 100644
index 52ef31a..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.io.Writer;
-
-public class RemoteCallException extends RemoteException {
-  private int seqId;
-  private String originExceptionClass;
-
-  public RemoteCallException(int seqId, MethodDescriptor methodDesc,
-                             Throwable t) {
-    super("Remote call error occurs when " + methodDesc.getFullName() + "is called:", t);
-    this.seqId = seqId;
-    if (t != null) {
-      originExceptionClass = t.getClass().getCanonicalName();
-    }
-  }
-
-  public RemoteCallException(int seqId, Throwable t) {
-    super(t);
-    this.seqId = seqId;
-    if (t != null) {
-      originExceptionClass = t.getClass().getCanonicalName();
-    }
-  }
-
-  public RpcResponse getResponse() {
-    RpcResponse.Builder builder = RpcResponse.newBuilder();
-    builder.setId(seqId);
-    if (getCause().getMessage() == null) {
-      builder.setErrorMessage(getCause().getClass().getName());
-    } else {
-      builder.setErrorMessage(getCause().getMessage());
-    }
-    builder.setErrorTrace(getStackTraceString(getCause()));
-    builder.setErrorClass(originExceptionClass);
-
-    return builder.build();
-  }
-
-  private static String getStackTraceString(Throwable aThrowable) {
-    final Writer result = new StringWriter();
-    final PrintWriter printWriter = new PrintWriter(result);
-    aThrowable.printStackTrace(printWriter);
-    return result.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteException.java
deleted file mode 100644
index 30c110d..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-public class RemoteException extends RuntimeException {
-  public RemoteException() {
-    super();
-  }
-
-  public RemoteException(String message) {
-    super(message);
-  }
-
-  public RemoteException(Throwable t) {
-    super(t);
-  }
-
-  public RemoteException(String message, Throwable t) {
-    super(message, t);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
deleted file mode 100644
index 3c054ad..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.List;
-
-public class RetriesExhaustedException extends RuntimeException {
-  private static final long serialVersionUID = 1876775844L;
-
-  public RetriesExhaustedException(final String msg) {
-    super(msg);
-  }
-
-  public RetriesExhaustedException(final String msg, final IOException e) {
-    super(msg, e);
-  }
-
-  /**
-   * Datastructure that allows adding more info around Throwable incident.
-   */
-  public static class ThrowableWithExtraContext {
-    private final Throwable t;
-    private final long when;
-    private final String extras;
-
-    public ThrowableWithExtraContext(final Throwable t, final long when,
-        final String extras) {
-      this.t = t;
-      this.when = when;
-      this.extras = extras;
-    }
- 
-    @Override
-    public String toString() {
-      return new Date(this.when).toString() + ", " + extras + ", " + t.toString();
-    }
-  }
-
-  /**
-   * Create a new RetriesExhaustedException from the list of prior failures.
-   * @param callableVitals Details from the {@link ServerCallable} we were using
-   * when we got this exception.
-   * @param numTries The number of tries we made
-   * @param exceptions List of exceptions that failed before giving up
-   */
-  public RetriesExhaustedException(final String callableVitals, int numTries,
-      List<Throwable> exceptions) {
-    super(getMessage(callableVitals, numTries, exceptions));
-  }
-
-  /**
-   * Create a new RetriesExhaustedException from the list of prior failures.
-   * @param numTries
-   * @param exceptions List of exceptions that failed before giving up
-   */
-  public RetriesExhaustedException(final int numTries,
-      final List<Throwable> exceptions) {
-    super(getMessage(numTries, exceptions));
-  }
-
-  private static String getMessage(String callableVitals, int numTries,
-      List<Throwable> exceptions) {
-    StringBuilder buffer = new StringBuilder("Failed contacting ");
-    buffer.append(callableVitals);
-    buffer.append(" after ");
-    buffer.append(numTries + 1);
-    buffer.append(" attempts.\nExceptions:\n");
-    for (Throwable t : exceptions) {
-      buffer.append(t.toString());
-      buffer.append("\n");
-    }
-    return buffer.toString();
-  }
-
-  private static String getMessage(final int numTries,
-      final List<Throwable> exceptions) {
-    StringBuilder buffer = new StringBuilder("Failed after attempts=");
-    buffer.append(numTries + 1);
-    buffer.append(", exceptions:\n");
-    for (Throwable t : exceptions) {
-      buffer.append(t.toString());
-      buffer.append("\n");
-    }
-    return buffer.toString();
-  }
-}
\ No newline at end of file


Mime
View raw message