Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4BE541015B for ; Sat, 10 Jan 2015 07:13:42 +0000 (UTC) Received: (qmail 54548 invoked by uid 500); 10 Jan 2015 07:13:43 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 54514 invoked by uid 500); 10 Jan 2015 07:13:43 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 54505 invoked by uid 99); 10 Jan 2015 07:13:43 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Jan 2015 07:13:43 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 43DA19A7712; Sat, 10 Jan 2015 07:13:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hyunsik@apache.org To: commits@tajo.apache.org Message-Id: <60cfd643c9d940b4bcafee09a6516651@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tajo git commit: TAJO-1281: Remove hadoop-common dependency from tajo-rpc. Date: Sat, 10 Jan 2015 07:13:43 +0000 (UTC) Repository: tajo Updated Branches: refs/heads/master 807868bd4 -> 9eac34fe3 TAJO-1281: Remove hadoop-common dependency from tajo-rpc. Closes #343 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/9eac34fe Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/9eac34fe Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/9eac34fe Branch: refs/heads/master Commit: 9eac34fe3121b8bc6bbe1b81ca76852ddedb2603 Parents: 807868b Author: Hyunsik Choi Authored: Sat Jan 10 16:12:08 2015 +0900 Committer: Hyunsik Choi Committed: Sat Jan 10 16:12:08 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/catalog/AbstractCatalogClient.java | 11 +--- .../apache/tajo/client/SessionConnection.java | 2 +- .../org/apache/tajo/master/QueryInProgress.java | 4 +- .../apache/tajo/master/TajoContainerProxy.java | 14 ++-- .../apache/tajo/querymaster/QueryMaster.java | 2 +- .../tajo/worker/ExecutionBlockContext.java | 2 +- .../tajo/worker/TajoResourceAllocator.java | 6 +- .../tajo/worker/WorkerHeartbeatService.java | 2 +- .../ConnectivityCheckerRuleForTajoWorker.java | 2 +- tajo-rpc/pom.xml | 27 ++++---- .../org/apache/tajo/rpc/AsyncRpcClient.java | 3 +- .../org/apache/tajo/rpc/BlockingRpcClient.java | 5 +- .../org/apache/tajo/rpc/NettyClientBase.java | 3 +- .../org/apache/tajo/rpc/NettyServerBase.java | 4 +- .../org/apache/tajo/rpc/RpcChannelFactory.java | 18 ++++-- .../org/apache/tajo/rpc/RpcConnectionPool.java | 13 ++-- .../main/java/org/apache/tajo/rpc/RpcUtils.java | 68 ++++++++++++++++++++ .../org/apache/tajo/rpc/ServerCallable.java | 5 +- .../java/org/apache/tajo/rpc/TestAsyncRpc.java | 7 +- .../org/apache/tajo/rpc/TestBlockingRpc.java | 12 ++-- 21 files changed, 137 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 89488da..b5578e7 100644 --- a/CHANGES +++ b/CHANGES @@ -27,6 +27,8 @@ Release 0.9.1 - unreleased IMPROVEMENT + TAJO-1281: Remove hadoop-common dependency from tajo-rpc. (hyunsik) + TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java index 8ef1c9a..1a2fd44 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java +++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java @@ -19,7 +19,6 @@ package org.apache.tajo.catalog; import com.google.protobuf.ServiceException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.annotation.Nullable; @@ -27,21 +26,15 @@ import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService; import org.apache.tajo.catalog.exception.NoSuchFunctionException; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; -import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto; -import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto; -import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto; -import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto; -import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto; import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.ServerCallable; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto; -import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.util.ProtoUtil; import java.net.InetSocketAddress; @@ -62,7 +55,7 @@ public abstract class AbstractCatalogClient implements CatalogService { abstract CatalogProtocolService.BlockingInterface getStub(NettyClientBase client); public AbstractCatalogClient(TajoConf conf, InetSocketAddress catalogServerAddr) { - this.pool = RpcConnectionPool.getPool(conf); + this.pool = RpcConnectionPool.getPool(); this.catalogServerAddr = catalogServerAddr; this.conf = conf; } http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index 1bc8050..5490be4 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -105,7 +105,7 @@ public class SessionConnection implements Closeable { this.tajoMasterAddr = addr; int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM); // Don't share connection pool per client - connPool = RpcConnectionPool.newPool(conf, getClass().getSimpleName(), workerNum); + connPool = RpcConnectionPool.newPool(getClass().getSimpleName(), workerNum); userInfo = UserRoleInfo.getCurrentUser(); this.baseDatabase = baseDatabase != null ? baseDatabase : null; } http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java index 73d8cb2..7587543 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java @@ -96,7 +96,7 @@ public class QueryInProgress { masterContext.getResourceManager().releaseQueryMaster(queryId); if(queryMasterRpc != null) { - RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc); + RpcConnectionPool.getPool().closeConnection(queryMasterRpc); } masterContext.getHistoryWriter().appendHistory(queryInfo); @@ -130,7 +130,7 @@ public class QueryInProgress { InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort()); LOG.info("Connect to QueryMaster:" + addr); queryMasterRpc = - RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true); + RpcConnectionPool.getPool().getConnection(addr, QueryMasterProtocol.class, true); queryMasterRpcClient = queryMasterRpc.getStub(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index 2ffd7ca..588b7ee 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -25,18 +25,18 @@ import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.master.container.TajoContainer; import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.rm.TajoWorkerContainer; import org.apache.tajo.master.rm.TajoWorkerContainerId; +import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; -import org.apache.tajo.ha.HAServiceUtil; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -79,13 +79,13 @@ public class TajoContainerProxy extends ContainerProxy { NettyClientBase tajoWorkerRpc = null; try { InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort()); - tajoWorkerRpc = RpcConnectionPool.getPool(context.getConf()).getConnection(addr, TajoWorkerProtocol.class, true); + tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), NullCallback.get()); } catch (Exception e) { LOG.error(e.getMessage(), e); } finally { - RpcConnectionPool.getPool(context.getConf()).releaseConnection(tajoWorkerRpc); + RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc); } } @@ -96,7 +96,7 @@ public class TajoContainerProxy extends ContainerProxy { .getQueryMasterManagerService().getBindAddr(); InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort()); - tajoWorkerRpc = RpcConnectionPool.getPool(context.getConf()).getConnection(addr, TajoWorkerProtocol.class, true); + tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); TajoWorkerProtocol.RunExecutionBlockRequestProto request = @@ -114,7 +114,7 @@ public class TajoContainerProxy extends ContainerProxy { } catch (Exception e) { LOG.error(e.getMessage(), e); } finally { - RpcConnectionPool.getPool(context.getConf()).releaseConnection(tajoWorkerRpc); + RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc); } } @@ -166,7 +166,7 @@ public class TajoContainerProxy extends ContainerProxy { containerIdProtos.add(TajoWorkerContainerId.getContainerIdProto(eachContainerId)); } - RpcConnectionPool connPool = RpcConnectionPool.getPool(context.getConf()); + RpcConnectionPool connPool = RpcConnectionPool.getPool(); NettyClientBase tmClient = null; try { // In TajoMaster HA mode, if backup master be active status, http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index 53390a1..be78fc3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -101,7 +101,7 @@ public class QueryMaster extends CompositeService implements EventHandler { LOG.info("QueryMaster init"); try { this.systemConf = (TajoConf)conf; - this.connPool = RpcConnectionPool.getPool(systemConf); + this.connPool = RpcConnectionPool.getPool(); querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT); queryMasterContext = new QueryMasterContext(systemConf); http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index dd3ee68..b120d5b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -100,7 +100,7 @@ public class ExecutionBlockContext { throws Throwable { this.manager = manager; this.executionBlockId = event.getExecutionBlockId(); - this.connPool = RpcConnectionPool.getPool(manager.getTajoConf()); + this.connPool = RpcConnectionPool.getPool(); this.queryMaster = queryMaster; this.systemConf = manager.getTajoConf(); this.reporter = new Reporter(); http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index 04b65d2..7278317 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -186,14 +186,14 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { NettyClientBase tajoWorkerRpc = null; try { InetSocketAddress addr = new InetSocketAddress(worker.getHost(), worker.getPort()); - tajoWorkerRpc = RpcConnectionPool.getPool(tajoConf).getConnection(addr, TajoWorkerProtocol.class, true); + tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); tajoWorkerRpcClient.stopExecutionBlock(null, executionBlockId.getProto(), NullCallback.get()); } catch (Throwable e) { LOG.error(e.getMessage(), e); } finally { - RpcConnectionPool.getPool(tajoConf).releaseConnection(tajoWorkerRpc); + RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc); } } @@ -271,7 +271,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { .setQueryId(event.getExecutionBlockId().getQueryId().getProto()) .build(); - RpcConnectionPool connPool = RpcConnectionPool.getPool(queryTaskContext.getConf()); + RpcConnectionPool connPool = RpcConnectionPool.getPool(); NettyClientBase tmClient = null; try { http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java index b92c4cd..676c72b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java @@ -74,7 +74,7 @@ public class WorkerHeartbeatService extends AbstractService { Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance."); this.systemConf = (TajoConf) conf; - connectionPool = RpcConnectionPool.getPool(systemConf); + connectionPool = RpcConnectionPool.getPool(); super.serviceInit(conf); } http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java index 68890e3..9d9f39c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java @@ -40,7 +40,7 @@ import java.net.InetSocketAddress; public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule { private void checkTajoMasterConnectivity(TajoConf tajoConf) throws Exception { - RpcConnectionPool pool = RpcConnectionPool.getPool(tajoConf); + RpcConnectionPool pool = RpcConnectionPool.getPool(); NettyClientBase masterClient = null; InetSocketAddress masterAddress = null; http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-rpc/pom.xml b/tajo-rpc/pom.xml index 1e00e70..d0037ca 100644 --- a/tajo-rpc/pom.xml +++ b/tajo-rpc/pom.xml @@ -137,23 +137,24 @@ - org.apache.tajo - tajo-common - - - com.google.code.gson - gson - - - - io.netty netty - org.apache.hadoop - hadoop-common - provided + commons-logging + commons-logging + + + commons-lang + commons-lang + + + com.google.guava + guava + + + com.google.protobuf + protobuf-java junit http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java index 7a416a8..4b1842e 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java @@ -24,7 +24,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.rpc.RpcProtos.RpcRequest; import org.apache.tajo.rpc.RpcProtos.RpcResponse; -import org.apache.tajo.util.NetUtils; import org.jboss.netty.channel.*; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; @@ -180,7 +179,7 @@ public class AsyncRpcClient extends NettyClientBase { private String getErrorMessage(String message) { return "Exception [" + protocol.getCanonicalName() + - "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress) + "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress) getChannel().getRemoteAddress()) + ")]: " + message; } http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java index 03d5d3e..869919c 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java @@ -27,7 +27,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.rpc.RpcProtos.RpcRequest; import org.apache.tajo.rpc.RpcProtos.RpcResponse; -import org.apache.tajo.util.NetUtils; import org.jboss.netty.channel.*; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; @@ -159,7 +158,7 @@ public class BlockingRpcClient extends NettyClientBase { private String getErrorMessage(String message) { if(protocol != null && getChannel() != null) { return protocol.getName() + - "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress) + "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress) getChannel().getRemoteAddress()) + "): " + message; } else { return "Exception " + message; @@ -169,7 +168,7 @@ public class BlockingRpcClient extends NettyClientBase { private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) { if(protocol != null && getChannel() != null) { return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(), - NetUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().getRemoteAddress())); + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().getRemoteAddress())); } else { return new TajoServiceException(response.getErrorMessage()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java index d0002de..bc0c567 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java @@ -21,7 +21,6 @@ package org.apache.tajo.rpc; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tajo.util.NetUtils; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.*; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; @@ -110,7 +109,7 @@ public abstract class NettyClientBase implements Closeable { public void connect(InetSocketAddress addr) throws ConnectTimeoutException { if(addr.isUnresolved()){ - addr = NetUtils.createSocketAddr(addr.getHostName(), addr.getPort()); + addr = RpcUtils.createSocketAddr(addr.getHostName(), addr.getPort()); } handleConnectionInternally(addr); http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java index e75418d..ef090ff 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java @@ -20,12 +20,10 @@ package org.apache.tajo.rpc; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tajo.util.NetUtils; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.DefaultChannelFuture; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.DefaultChannelGroup; @@ -126,7 +124,7 @@ public class NettyServerBase { if (bindAddress != null) { LOG.info("Rpc (" + serviceName + ") listened on " - + NetUtils.normalizeInetSocketAddress(bindAddress)+ ") shutdown"); + + RpcUtils.normalizeInetSocketAddress(bindAddress)+ ") shutdown"); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java index 6274eff..0727f71 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java @@ -21,7 +21,6 @@ package org.apache.tajo.rpc; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tajo.conf.TajoConf; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.channel.socket.ServerSocketChannelFactory; import org.jboss.netty.channel.socket.nio.*; @@ -34,6 +33,9 @@ import java.util.concurrent.atomic.AtomicInteger; public final class RpcChannelFactory { private static final Log LOG = LogFactory.getLog(RpcChannelFactory.class); + + private static final int DEFAULT_WORKER_NUM = Runtime.getRuntime().availableProcessors() * 2; + private static ClientSocketChannelFactory factory; private static AtomicInteger clientCount = new AtomicInteger(0); private static AtomicInteger serverCount = new AtomicInteger(0); @@ -45,11 +47,19 @@ public final class RpcChannelFactory { * make this factory static thus all clients can share its thread pool. * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe */ - public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory(){ + public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory() { + return getSharedClientChannelFactory(DEFAULT_WORKER_NUM); + } + + /** + * make this factory static thus all clients can share its thread pool. + * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe + * + * @param workerNum The number of workers + */ + public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory(int workerNum){ //shared woker and boss pool if(factory == null){ - TajoConf conf = new TajoConf(); - int workerNum = conf.getIntVar(TajoConf.ConfVars.INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM); factory = createClientChannelFactory("Internal-Client", workerNum); } return factory; http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java index 2f3d433..c8e622b 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java @@ -21,7 +21,6 @@ package org.apache.tajo.rpc; import com.google.common.base.Objects; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tajo.conf.TajoConf; import org.jboss.netty.channel.ConnectTimeoutException; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.DefaultChannelGroup; @@ -43,25 +42,23 @@ public class RpcConnectionPool { private static RpcConnectionPool instance; private final ClientSocketChannelFactory channelFactory; - private final TajoConf conf; public final static int RPC_RETRIES = 3; - private RpcConnectionPool(TajoConf conf, ClientSocketChannelFactory channelFactory) { - this.conf = conf; + private RpcConnectionPool(ClientSocketChannelFactory channelFactory) { this.channelFactory = channelFactory; } - public synchronized static RpcConnectionPool getPool(TajoConf conf) { + public synchronized static RpcConnectionPool getPool() { if(instance == null) { InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory()); - instance = new RpcConnectionPool(conf, RpcChannelFactory.getSharedClientChannelFactory()); + instance = new RpcConnectionPool(RpcChannelFactory.getSharedClientChannelFactory()); } return instance; } - public synchronized static RpcConnectionPool newPool(TajoConf conf, String poolName, int workerNum) { - return new RpcConnectionPool(conf, RpcChannelFactory.createClientChannelFactory(poolName, workerNum)); + public synchronized static RpcConnectionPool newPool(String poolName, int workerNum) { + return new RpcConnectionPool(RpcChannelFactory.createClientChannelFactory(poolName, workerNum)); } private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey) http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java new file mode 100644 index 0000000..b6be05f --- /dev/null +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; + +public class RpcUtils { + + public static String normalizeInetSocketAddress(InetSocketAddress addr) { + return addr.getAddress().getHostAddress() + ":" + addr.getPort(); + } + + /** + * Util method to build socket addr from either: + * + * : + * ://:/ + */ + public static InetSocketAddress createSocketAddr(String host, int port) { + return new InetSocketAddress(host, port); + } + + /** + * Returns InetSocketAddress that a client can use to + * connect to the server. NettyServerBase.getListenerAddress() is not correct when + * the server binds to "0.0.0.0". This returns "hostname:port" of the server, + * or "127.0.0.1:port" when the getListenerAddress() returns "0.0.0.0:port". + * + * @param addr of a listener + * @return socket address that a client can use to connect to the server. + */ + public static InetSocketAddress getConnectAddress(InetSocketAddress addr) { + if (!addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()) { + try { + addr = new InetSocketAddress(InetAddress.getLocalHost(), addr.getPort()); + } catch (UnknownHostException uhe) { + // shouldn't get here unless the host doesn't have a loopback iface + addr = new InetSocketAddress("127.0.0.1", addr.getPort()); + } + } + InetSocketAddress canonicalAddress = + new InetSocketAddress(addr.getAddress().getCanonicalHostName(), addr.getPort()); + return canonicalAddress; + } + + public static InetSocketAddress createUnresolved(String addr) { + String [] splitted = addr.split(":"); + return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1])); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java index b4e5f9a..140f781 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java @@ -18,15 +18,14 @@ package org.apache.tajo.rpc; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; -import com.google.protobuf.ServiceException; -import org.apache.tajo.conf.TajoConf; - public abstract class ServerCallable { protected InetSocketAddress addr; protected long startTime; http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java index 7c8246a..61a92bc 100644 --- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java +++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java @@ -27,7 +27,6 @@ import org.apache.tajo.rpc.test.TestProtos.EchoMessage; import org.apache.tajo.rpc.test.TestProtos.SumRequest; import org.apache.tajo.rpc.test.TestProtos.SumResponse; import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl; -import org.apache.tajo.util.NetUtils; import org.jboss.netty.channel.ConnectTimeoutException; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.junit.After; @@ -65,7 +64,7 @@ public class TestAsyncRpc { service, new InetSocketAddress("127.0.0.1", 0), 2); server.start(); client = new AsyncRpcClient(DummyProtocol.class, - NetUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries); + RpcUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries); stub = client.getStub(); } @@ -256,9 +255,9 @@ public class TestAsyncRpc { client.close(); client = null; - String hostAndPort = NetUtils.normalizeInetSocketAddress(server.getListenAddress()); + String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); client = new AsyncRpcClient(DummyProtocol.class, - NetUtils.createUnresolved(hostAndPort), clientChannelFactory, retries); + RpcUtils.createUnresolved(hostAndPort), clientChannelFactory, retries); Interface stub = client.getStub(); EchoMessage echoMessage = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); http://git-wip-us.apache.org/repos/asf/tajo/blob/9eac34fe/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java index 28a3fad..746bfcb 100644 --- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java +++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java @@ -18,14 +18,12 @@ package org.apache.tajo.rpc; -import org.apache.tajo.conf.TajoConf; import org.apache.tajo.rpc.test.DummyProtocol; import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface; import org.apache.tajo.rpc.test.TestProtos.EchoMessage; import org.apache.tajo.rpc.test.TestProtos.SumRequest; import org.apache.tajo.rpc.test.TestProtos.SumResponse; import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl; -import org.apache.tajo.util.NetUtils; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.junit.After; import org.junit.Before; @@ -59,7 +57,7 @@ public class TestBlockingRpc { new InetSocketAddress("127.0.0.1", 0), 2); server.start(); client = new BlockingRpcClient(DummyProtocol.class, - NetUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries); + RpcUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries); stub = client.getStub(); } @@ -96,7 +94,7 @@ public class TestBlockingRpc { @Test public void testRpcWithServiceCallable() throws Exception { - RpcConnectionPool pool = RpcConnectionPool.newPool(new TajoConf(), getClass().getSimpleName(), 2); + RpcConnectionPool pool = RpcConnectionPool.newPool(getClass().getSimpleName(), 2); final SumRequest request = SumRequest.newBuilder() .setX1(1) .setX2(2) @@ -187,7 +185,7 @@ public class TestBlockingRpc { try { int port = server.getListenAddress().getPort() + 1; new BlockingRpcClient(DummyProtocol.class, - NetUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), clientChannelFactory, retries); + RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), clientChannelFactory, retries); fail("Connection should be failed."); } catch (ConnectException ce) { expected = true; @@ -260,9 +258,9 @@ public class TestBlockingRpc { client.close(); client = null; - String hostAndPort = NetUtils.normalizeInetSocketAddress(server.getListenAddress()); + String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); client = new BlockingRpcClient(DummyProtocol.class, - NetUtils.createUnresolved(hostAndPort), clientChannelFactory, retries); + RpcUtils.createUnresolved(hostAndPort), clientChannelFactory, retries); BlockingInterface stub = client.getStub(); EchoMessage message = EchoMessage.newBuilder()