tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject tajo git commit: TAJO-1281: Remove hadoop-common dependency from tajo-rpc.
Date Sat, 10 Jan 2015 07:13:43 GMT
Repository: tajo
Updated Branches:
  refs/heads/master 807868bd4 -> 9eac34fe3


TAJO-1281: Remove hadoop-common dependency from tajo-rpc.

Closes #343


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/9eac34fe
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/9eac34fe
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/9eac34fe

Branch: refs/heads/master
Commit: 9eac34fe3121b8bc6bbe1b81ca76852ddedb2603
Parents: 807868b
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Sat Jan 10 16:12:08 2015 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Sat Jan 10 16:12:08 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../tajo/catalog/AbstractCatalogClient.java     | 11 +---
 .../apache/tajo/client/SessionConnection.java   |  2 +-
 .../org/apache/tajo/master/QueryInProgress.java |  4 +-
 .../apache/tajo/master/TajoContainerProxy.java  | 14 ++--
 .../apache/tajo/querymaster/QueryMaster.java    |  2 +-
 .../tajo/worker/ExecutionBlockContext.java      |  2 +-
 .../tajo/worker/TajoResourceAllocator.java      |  6 +-
 .../tajo/worker/WorkerHeartbeatService.java     |  2 +-
 .../ConnectivityCheckerRuleForTajoWorker.java   |  2 +-
 tajo-rpc/pom.xml                                | 27 ++++----
 .../org/apache/tajo/rpc/AsyncRpcClient.java     |  3 +-
 .../org/apache/tajo/rpc/BlockingRpcClient.java  |  5 +-
 .../org/apache/tajo/rpc/NettyClientBase.java    |  3 +-
 .../org/apache/tajo/rpc/NettyServerBase.java    |  4 +-
 .../org/apache/tajo/rpc/RpcChannelFactory.java  | 18 ++++--
 .../org/apache/tajo/rpc/RpcConnectionPool.java  | 13 ++--
 .../main/java/org/apache/tajo/rpc/RpcUtils.java | 68 ++++++++++++++++++++
 .../org/apache/tajo/rpc/ServerCallable.java     |  5 +-
 .../java/org/apache/tajo/rpc/TestAsyncRpc.java  |  7 +-
 .../org/apache/tajo/rpc/TestBlockingRpc.java    | 12 ++--
 21 files changed, 137 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 89488da..b5578e7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,8 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1281: Remove hadoop-common dependency from tajo-rpc. (hyunsik)
+
     TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol.
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 8ef1c9a..1a2fd44 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.catalog;
 
 import com.google.protobuf.ServiceException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.annotation.Nullable;
@@ -27,21 +26,15 @@ import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService;
 import org.apache.tajo.catalog.exception.NoSuchFunctionException;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.ServerCallable;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
-import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.util.ProtoUtil;
 
 import java.net.InetSocketAddress;
@@ -62,7 +55,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   abstract CatalogProtocolService.BlockingInterface getStub(NettyClientBase client);
 
   public AbstractCatalogClient(TajoConf conf, InetSocketAddress catalogServerAddr) {
-    this.pool = RpcConnectionPool.getPool(conf);
+    this.pool = RpcConnectionPool.getPool();
     this.catalogServerAddr = catalogServerAddr;
     this.conf = conf;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index 1bc8050..5490be4 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -105,7 +105,7 @@ public class SessionConnection implements Closeable {
     this.tajoMasterAddr = addr;
     int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM);
     // Don't share connection pool per client
-    connPool = RpcConnectionPool.newPool(conf, getClass().getSimpleName(), workerNum);
+    connPool = RpcConnectionPool.newPool(getClass().getSimpleName(), workerNum);
     userInfo = UserRoleInfo.getCurrentUser();
     this.baseDatabase = baseDatabase != null ? baseDatabase : null;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index 73d8cb2..7587543 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -96,7 +96,7 @@ public class QueryInProgress {
     masterContext.getResourceManager().releaseQueryMaster(queryId);
 
     if(queryMasterRpc != null) {
-      RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
+      RpcConnectionPool.getPool().closeConnection(queryMasterRpc);
     }
 
     masterContext.getHistoryWriter().appendHistory(queryInfo);
@@ -130,7 +130,7 @@ public class QueryInProgress {
     InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
     LOG.info("Connect to QueryMaster:" + addr);
     queryMasterRpc =
-        RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class,
true);
+        RpcConnectionPool.getPool().getConnection(addr, QueryMasterProtocol.class, true);
     queryMasterRpcClient = queryMasterRpc.getStub();
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 2ffd7ca..588b7ee 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -25,18 +25,18 @@ import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.ContainerProtocol;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.master.container.TajoContainer;
 import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.master.rm.TajoWorkerContainer;
 import org.apache.tajo.master.rm.TajoWorkerContainerId;
+import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.ha.HAServiceUtil;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -79,13 +79,13 @@ public class TajoContainerProxy extends ContainerProxy {
     NettyClientBase tajoWorkerRpc = null;
     try {
       InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
-      tajoWorkerRpc = RpcConnectionPool.getPool(context.getConf()).getConnection(addr, TajoWorkerProtocol.class,
true);
+      tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class,
true);
       TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
       tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), NullCallback.get());
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
     } finally {
-      RpcConnectionPool.getPool(context.getConf()).releaseConnection(tajoWorkerRpc);
+      RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc);
     }
   }
 
@@ -96,7 +96,7 @@ public class TajoContainerProxy extends ContainerProxy {
           .getQueryMasterManagerService().getBindAddr();
 
       InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
-      tajoWorkerRpc = RpcConnectionPool.getPool(context.getConf()).getConnection(addr, TajoWorkerProtocol.class,
true);
+      tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class,
true);
       TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
 
       TajoWorkerProtocol.RunExecutionBlockRequestProto request =
@@ -114,7 +114,7 @@ public class TajoContainerProxy extends ContainerProxy {
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
     } finally {
-      RpcConnectionPool.getPool(context.getConf()).releaseConnection(tajoWorkerRpc);
+      RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc);
     }
   }
 
@@ -166,7 +166,7 @@ public class TajoContainerProxy extends ContainerProxy {
       containerIdProtos.add(TajoWorkerContainerId.getContainerIdProto(eachContainerId));
     }
 
-    RpcConnectionPool connPool = RpcConnectionPool.getPool(context.getConf());
+    RpcConnectionPool connPool = RpcConnectionPool.getPool();
     NettyClientBase tmClient = null;
     try {
       // In TajoMaster HA mode, if backup master be active status,

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index 53390a1..be78fc3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -101,7 +101,7 @@ public class QueryMaster extends CompositeService implements EventHandler
{
     LOG.info("QueryMaster init");
     try {
       this.systemConf = (TajoConf)conf;
-      this.connPool = RpcConnectionPool.getPool(systemConf);
+      this.connPool = RpcConnectionPool.getPool();
 
       querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
       queryMasterContext = new QueryMasterContext(systemConf);

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index dd3ee68..b120d5b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -100,7 +100,7 @@ public class ExecutionBlockContext {
       throws Throwable {
     this.manager = manager;
     this.executionBlockId = event.getExecutionBlockId();
-    this.connPool = RpcConnectionPool.getPool(manager.getTajoConf());
+    this.connPool = RpcConnectionPool.getPool();
     this.queryMaster = queryMaster;
     this.systemConf = manager.getTajoConf();
     this.reporter = new Reporter();

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 04b65d2..7278317 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -186,14 +186,14 @@ public class TajoResourceAllocator extends AbstractResourceAllocator
{
     NettyClientBase tajoWorkerRpc = null;
     try {
       InetSocketAddress addr = new InetSocketAddress(worker.getHost(), worker.getPort());
-      tajoWorkerRpc = RpcConnectionPool.getPool(tajoConf).getConnection(addr, TajoWorkerProtocol.class,
true);
+      tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class,
true);
       TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
 
       tajoWorkerRpcClient.stopExecutionBlock(null, executionBlockId.getProto(), NullCallback.get());
     } catch (Throwable e) {
       LOG.error(e.getMessage(), e);
     } finally {
-      RpcConnectionPool.getPool(tajoConf).releaseConnection(tajoWorkerRpc);
+      RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc);
     }
   }
 
@@ -271,7 +271,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
           .setQueryId(event.getExecutionBlockId().getQueryId().getProto())
           .build();
 
-      RpcConnectionPool connPool = RpcConnectionPool.getPool(queryTaskContext.getConf());
+      RpcConnectionPool connPool = RpcConnectionPool.getPool();
       NettyClientBase tmClient = null;
       try {
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index b92c4cd..676c72b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -74,7 +74,7 @@ public class WorkerHeartbeatService extends AbstractService {
     Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf
instance.");
     this.systemConf = (TajoConf) conf;
 
-    connectionPool = RpcConnectionPool.getPool(systemConf);
+    connectionPool = RpcConnectionPool.getPool();
     super.serviceInit(conf);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
index 68890e3..9d9f39c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
@@ -40,7 +40,7 @@ import java.net.InetSocketAddress;
 public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule {
   
   private void checkTajoMasterConnectivity(TajoConf tajoConf) throws Exception {
-    RpcConnectionPool pool = RpcConnectionPool.getPool(tajoConf);
+    RpcConnectionPool pool = RpcConnectionPool.getPool();
     NettyClientBase masterClient = null;
     InetSocketAddress masterAddress = null;
     

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-rpc/pom.xml b/tajo-rpc/pom.xml
index 1e00e70..d0037ca 100644
--- a/tajo-rpc/pom.xml
+++ b/tajo-rpc/pom.xml
@@ -137,23 +137,24 @@
 
   <dependencies>
     <dependency>
-      <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-common</artifactId>
-      <exclusions>
-        <exclusion>
-          <groupId>com.google.code.gson</groupId>
-          <artifactId>gson</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <scope>provided</scope>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
     </dependency>
     <dependency>
       <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index 7a416a8..4b1842e 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -24,7 +24,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.rpc.RpcProtos.RpcRequest;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-import org.apache.tajo.util.NetUtils;
 import org.jboss.netty.channel.*;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 
@@ -180,7 +179,7 @@ public class AsyncRpcClient extends NettyClientBase {
 
   private String getErrorMessage(String message) {
     return "Exception [" + protocol.getCanonicalName() +
-        "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)
+        "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
         getChannel().getRemoteAddress()) + ")]: " + message;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index 03d5d3e..869919c 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.rpc.RpcProtos.RpcRequest;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-import org.apache.tajo.util.NetUtils;
 import org.jboss.netty.channel.*;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 
@@ -159,7 +158,7 @@ public class BlockingRpcClient extends NettyClientBase {
   private String getErrorMessage(String message) {
     if(protocol != null && getChannel() != null) {
       return protocol.getName() +
-          "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)
+          "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
           getChannel().getRemoteAddress()) + "): " + message;
     } else {
       return "Exception " + message;
@@ -169,7 +168,7 @@ public class BlockingRpcClient extends NettyClientBase {
   private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause)
{
     if(protocol != null && getChannel() != null) {
       return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(),
-          NetUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().getRemoteAddress()));
+          RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().getRemoteAddress()));
     } else {
       return new TajoServiceException(response.getErrorMessage());
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index d0002de..bc0c567 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -21,7 +21,6 @@ package org.apache.tajo.rpc;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.util.NetUtils;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.*;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
@@ -110,7 +109,7 @@ public abstract class NettyClientBase implements Closeable {
 
   public void connect(InetSocketAddress addr) throws ConnectTimeoutException {
     if(addr.isUnresolved()){
-       addr = NetUtils.createSocketAddr(addr.getHostName(), addr.getPort());
+       addr = RpcUtils.createSocketAddr(addr.getHostName(), addr.getPort());
     }
 
     handleConnectionInternally(addr);

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
index e75418d..ef090ff 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
@@ -20,12 +20,10 @@ package org.apache.tajo.rpc;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.util.NetUtils;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.DefaultChannelFuture;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
 
@@ -126,7 +124,7 @@ public class NettyServerBase {
 
     if (bindAddress != null) {
       LOG.info("Rpc (" + serviceName + ") listened on "
-          + NetUtils.normalizeInetSocketAddress(bindAddress)+ ") shutdown");
+          + RpcUtils.normalizeInetSocketAddress(bindAddress)+ ") shutdown");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
index 6274eff..0727f71 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
@@ -21,7 +21,6 @@ package org.apache.tajo.rpc;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.conf.TajoConf;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.*;
@@ -34,6 +33,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public final class RpcChannelFactory {
   private static final Log LOG = LogFactory.getLog(RpcChannelFactory.class);
+
+  private static final int DEFAULT_WORKER_NUM = Runtime.getRuntime().availableProcessors()
* 2;
+
   private static ClientSocketChannelFactory factory;
   private static AtomicInteger clientCount = new AtomicInteger(0);
   private static AtomicInteger serverCount = new AtomicInteger(0);
@@ -45,11 +47,19 @@ public final class RpcChannelFactory {
    * make this factory static thus all clients can share its thread pool.
    * NioClientSocketChannelFactory has only one method newChannel() visible for user, which
is thread-safe
    */
-  public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory(){
+  public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory() {
+    return getSharedClientChannelFactory(DEFAULT_WORKER_NUM);
+  }
+
+  /**
+   * make this factory static thus all clients can share its thread pool.
+   * NioClientSocketChannelFactory has only one method newChannel() visible for user, which
is thread-safe
+   *
+   * @param workerNum The number of workers
+   */
+  public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory(int
workerNum){
     //shared woker and boss pool
     if(factory == null){
-      TajoConf conf = new TajoConf();
-      int workerNum = conf.getIntVar(TajoConf.ConfVars.INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM);
       factory = createClientChannelFactory("Internal-Client", workerNum);
     }
     return factory;

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
index 2f3d433..c8e622b 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
@@ -21,7 +21,6 @@ package org.apache.tajo.rpc;
 import com.google.common.base.Objects;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.conf.TajoConf;
 import org.jboss.netty.channel.ConnectTimeoutException;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
@@ -43,25 +42,23 @@ public class RpcConnectionPool {
 
   private static RpcConnectionPool instance;
   private final ClientSocketChannelFactory channelFactory;
-  private final TajoConf conf;
 
   public final static int RPC_RETRIES = 3;
 
-  private RpcConnectionPool(TajoConf conf, ClientSocketChannelFactory channelFactory) {
-    this.conf = conf;
+  private RpcConnectionPool(ClientSocketChannelFactory channelFactory) {
     this.channelFactory =  channelFactory;
   }
 
-  public synchronized static RpcConnectionPool getPool(TajoConf conf) {
+  public synchronized static RpcConnectionPool getPool() {
     if(instance == null) {
       InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory());
-      instance = new RpcConnectionPool(conf, RpcChannelFactory.getSharedClientChannelFactory());
+      instance = new RpcConnectionPool(RpcChannelFactory.getSharedClientChannelFactory());
     }
     return instance;
   }
 
-  public synchronized static RpcConnectionPool newPool(TajoConf conf, String poolName, int
workerNum) {
-    return new RpcConnectionPool(conf, RpcChannelFactory.createClientChannelFactory(poolName,
workerNum));
+  public synchronized static RpcConnectionPool newPool(String poolName, int workerNum) {
+    return new RpcConnectionPool(RpcChannelFactory.createClientChannelFactory(poolName, workerNum));
   }
 
   private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey)

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
new file mode 100644
index 0000000..b6be05f
--- /dev/null
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+
+public class RpcUtils {
+
+  public static String normalizeInetSocketAddress(InetSocketAddress addr) {
+    return addr.getAddress().getHostAddress() + ":" + addr.getPort();
+  }
+
+  /**
+   * Util method to build socket addr from either:
+   *   <host>
+   *   <host>:<port>
+   *   <fs>://<host>:<port>/<path>
+   */
+  public static InetSocketAddress createSocketAddr(String host, int port) {
+    return new InetSocketAddress(host, port);
+  }
+
+  /**
+   * Returns InetSocketAddress that a client can use to
+   * connect to the server. NettyServerBase.getListenerAddress() is not correct when
+   * the server binds to "0.0.0.0". This returns "hostname:port" of the server,
+   * or "127.0.0.1:port" when the getListenerAddress() returns "0.0.0.0:port".
+   *
+   * @param addr of a listener
+   * @return socket address that a client can use to connect to the server.
+   */
+  public static InetSocketAddress getConnectAddress(InetSocketAddress addr) {
+    if (!addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()) {
+      try {
+        addr = new InetSocketAddress(InetAddress.getLocalHost(), addr.getPort());
+      } catch (UnknownHostException uhe) {
+        // shouldn't get here unless the host doesn't have a loopback iface
+        addr = new InetSocketAddress("127.0.0.1", addr.getPort());
+      }
+    }
+    InetSocketAddress canonicalAddress =
+        new InetSocketAddress(addr.getAddress().getCanonicalHostName(), addr.getPort());
+    return canonicalAddress;
+  }
+
+  public static InetSocketAddress createUnresolved(String addr) {
+    String [] splitted = addr.split(":");
+    return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1]));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
index b4e5f9a..140f781 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
@@ -18,15 +18,14 @@
 
 package org.apache.tajo.rpc;
 
+import com.google.protobuf.ServiceException;
+
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 
-import com.google.protobuf.ServiceException;
-import org.apache.tajo.conf.TajoConf;
-
 public abstract class ServerCallable<T> {
   protected InetSocketAddress addr;
   protected long startTime;

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
index 7c8246a..61a92bc 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -27,7 +27,6 @@ import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
 import org.apache.tajo.rpc.test.TestProtos.SumRequest;
 import org.apache.tajo.rpc.test.TestProtos.SumResponse;
 import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
-import org.apache.tajo.util.NetUtils;
 import org.jboss.netty.channel.ConnectTimeoutException;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.junit.After;
@@ -65,7 +64,7 @@ public class TestAsyncRpc {
         service, new InetSocketAddress("127.0.0.1", 0), 2);
     server.start();
     client = new AsyncRpcClient(DummyProtocol.class,
-        NetUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries);
+        RpcUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries);
     stub = client.getStub();
   }
 
@@ -256,9 +255,9 @@ public class TestAsyncRpc {
     client.close();
     client = null;
 
-    String hostAndPort = NetUtils.normalizeInetSocketAddress(server.getListenAddress());
+    String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
     client = new AsyncRpcClient(DummyProtocol.class,
-        NetUtils.createUnresolved(hostAndPort), clientChannelFactory, retries);
+        RpcUtils.createUnresolved(hostAndPort), clientChannelFactory, retries);
     Interface stub = client.getStub();
     EchoMessage echoMessage = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();

http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 28a3fad..746bfcb 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -18,14 +18,12 @@
 
 package org.apache.tajo.rpc;
 
-import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.rpc.test.DummyProtocol;
 import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface;
 import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
 import org.apache.tajo.rpc.test.TestProtos.SumRequest;
 import org.apache.tajo.rpc.test.TestProtos.SumResponse;
 import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl;
-import org.apache.tajo.util.NetUtils;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.junit.After;
 import org.junit.Before;
@@ -59,7 +57,7 @@ public class TestBlockingRpc {
         new InetSocketAddress("127.0.0.1", 0), 2);
     server.start();
     client = new BlockingRpcClient(DummyProtocol.class,
-        NetUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries);
+        RpcUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries);
     stub = client.getStub();
   }
 
@@ -96,7 +94,7 @@ public class TestBlockingRpc {
 
   @Test
   public void testRpcWithServiceCallable() throws Exception {
-    RpcConnectionPool pool = RpcConnectionPool.newPool(new TajoConf(), getClass().getSimpleName(),
2);
+    RpcConnectionPool pool = RpcConnectionPool.newPool(getClass().getSimpleName(), 2);
     final SumRequest request = SumRequest.newBuilder()
         .setX1(1)
         .setX2(2)
@@ -187,7 +185,7 @@ public class TestBlockingRpc {
     try {
       int port = server.getListenAddress().getPort() + 1;
       new BlockingRpcClient(DummyProtocol.class,
-          NetUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), clientChannelFactory,
retries);
+          RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), clientChannelFactory,
retries);
       fail("Connection should be failed.");
     } catch (ConnectException ce) {
       expected = true;
@@ -260,9 +258,9 @@ public class TestBlockingRpc {
     client.close();
     client = null;
 
-    String hostAndPort = NetUtils.normalizeInetSocketAddress(server.getListenAddress());
+    String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
     client = new BlockingRpcClient(DummyProtocol.class,
-        NetUtils.createUnresolved(hostAndPort), clientChannelFactory, retries);
+        RpcUtils.createUnresolved(hostAndPort), clientChannelFactory, retries);
     BlockingInterface stub = client.getStub();
 
     EchoMessage message = EchoMessage.newBuilder()


Mime
View raw message