Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0520917B19 for ; Thu, 16 Apr 2015 09:17:43 +0000 (UTC) Received: (qmail 22529 invoked by uid 500); 16 Apr 2015 09:17:42 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 22491 invoked by uid 500); 16 Apr 2015 09:17:42 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 22417 invoked by uid 99); 16 Apr 2015 09:17:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Apr 2015 09:17:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A9A30E1021; Thu, 16 Apr 2015 09:17:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jhkim@apache.org To: commits@tajo.apache.org Date: Thu, 16 Apr 2015 09:17:43 -0000 Message-Id: <622c96b39275491c95e5f5722a6947b1@git.apache.org> In-Reply-To: <11da66497e604a0b9f8d7f6c8e121e7d@git.apache.org> References: <11da66497e604a0b9f8d7f6c8e121e7d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] tajo git commit: TAJO-1497: RPC client does not share a connection. (jinho) TAJO-1497: RPC client does not share a connection. (jinho) Closes #533 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7b78668b Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7b78668b Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7b78668b Branch: refs/heads/master Commit: 7b78668b7d90d268bb6065586fe880cda08571c4 Parents: 338a2b7 Author: Jinho Kim Authored: Thu Apr 16 18:16:37 2015 +0900 Committer: Jinho Kim Committed: Thu Apr 16 18:16:37 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/catalog/AbstractCatalogClient.java | 86 ++++----- .../tajo/client/CatalogAdminClientImpl.java | 40 ++-- .../org/apache/tajo/client/QueryClientImpl.java | 50 ++--- .../apache/tajo/client/SessionConnection.java | 40 ++-- .../org/apache/tajo/master/QueryInProgress.java | 6 +- .../apache/tajo/master/TajoContainerProxy.java | 38 ++-- .../apache/tajo/querymaster/QueryMaster.java | 24 +-- .../tajo/worker/ExecutionBlockContext.java | 29 +-- .../tajo/worker/TajoResourceAllocator.java | 20 +- .../main/java/org/apache/tajo/worker/Task.java | 73 ++++--- .../java/org/apache/tajo/worker/TaskRunner.java | 2 - .../tajo/worker/WorkerHeartbeatService.java | 10 +- .../ConnectivityCheckerRuleForTajoWorker.java | 26 +-- .../main/java/org/apache/tajo/rpc/RpcUtils.java | 34 ---- .../org/apache/tajo/rpc/AsyncRpcClient.java | 58 +++--- .../org/apache/tajo/rpc/AsyncRpcServer.java | 82 ++++---- .../org/apache/tajo/rpc/BlockingRpcClient.java | 88 +++++---- .../org/apache/tajo/rpc/BlockingRpcServer.java | 85 ++++----- .../tajo/rpc/ConnectionCloseFutureListener.java | 35 ++++ .../org/apache/tajo/rpc/NettyClientBase.java | 124 ++++-------- .../tajo/rpc/ProtoChannelInitializer.java | 11 +- .../org/apache/tajo/rpc/RpcClientManager.java | 185 ++++++++++++++++++ .../org/apache/tajo/rpc/RpcConnectionPool.java | 191 ------------------- .../org/apache/tajo/rpc/ServerCallable.java | 36 ++-- .../java/org/apache/tajo/rpc/TestAsyncRpc.java | 72 +++++-- .../org/apache/tajo/rpc/TestBlockingRpc.java | 85 ++++++--- .../apache/tajo/rpc/TestRpcClientManager.java | 97 ++++++++++ 28 files changed, 837 insertions(+), 792 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index fe8dd8e..3938489 100644 --- a/CHANGES +++ b/CHANGES @@ -89,6 +89,8 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1497: RPC client does not share a connection. (jinho) + TAJO-1467: Parenthesis at the start of SQL query is ignored. (Keuntae Park) http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java index 458d6e0..49be29a 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java +++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java @@ -30,7 +30,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.ServerCallable; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto; @@ -50,14 +50,14 @@ public abstract class AbstractCatalogClient implements CatalogService { private final Log LOG = LogFactory.getLog(AbstractCatalogClient.class); protected ServiceTracker serviceTracker; - protected RpcConnectionPool pool; + protected RpcClientManager manager; protected InetSocketAddress catalogServerAddr; protected TajoConf conf; abstract CatalogProtocolService.BlockingInterface getStub(NettyClientBase client); public AbstractCatalogClient(TajoConf conf, InetSocketAddress catalogServerAddr) { - this.pool = RpcConnectionPool.getPool(); + this.manager = RpcClientManager.getInstance(); this.catalogServerAddr = catalogServerAddr; this.serviceTracker = ServiceTrackerFactory.get(conf); this.conf = conf; @@ -79,7 +79,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Boolean createTablespace(final String tablespaceName, final String tablespaceUri) { try { - return new ServerCallable(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); @@ -98,7 +98,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Boolean dropTablespace(final String tablespaceName) { try { - return new ServerCallable(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.dropTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue(); @@ -113,7 +113,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Boolean existTablespace(final String tablespaceName) { try { - return new ServerCallable(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.existTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue(); @@ -128,7 +128,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Collection getAllTablespaceNames() { try { - return new ServerCallable>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Collection call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); PrimitiveProtos.StringListProto response = stub.getAllTablespaceNames(null, ProtoUtil.NULL_PROTO); @@ -144,7 +144,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public List getAllTablespaces() { try { - return new ServerCallable>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { @Override public List call(NettyClientBase client) throws Exception { @@ -162,7 +162,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public TablespaceProto getTablespace(final String tablespaceName) { try { - return new ServerCallable(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public TablespaceProto call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.getTablespace(null, ProtoUtil.convertString(tablespaceName)); @@ -177,7 +177,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public Boolean alterTablespace(final AlterTablespaceProto alterTablespace) { try { - return new ServerCallable(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.alterTablespace(null, alterTablespace).getValue(); @@ -192,7 +192,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Boolean createDatabase(final String databaseName, @Nullable final String tablespaceName) { try { - return new ServerCallable(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); @@ -213,7 +213,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Boolean dropDatabase(final String databaseName) { try { - return new ServerCallable(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.dropDatabase(null, ProtoUtil.convertString(databaseName)).getValue(); @@ -228,7 +228,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Boolean existDatabase(final String databaseName) { try { - return new ServerCallable(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.existDatabase(null, ProtoUtil.convertString(databaseName)).getValue(); @@ -243,7 +243,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Collection getAllDatabaseNames() { try { - return new ServerCallable>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Collection call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); PrimitiveProtos.StringListProto response = stub.getAllDatabaseNames(null, ProtoUtil.NULL_PROTO); @@ -259,7 +259,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public List getAllDatabases() { try { - return new ServerCallable>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { @Override public List call(NettyClientBase client) throws Exception { @@ -277,7 +277,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final TableDesc getTableDesc(final String databaseName, final String tableName) { try { - return new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public TableDesc call(NettyClientBase client) throws ServiceException { TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); builder.setDatabaseName(databaseName); @@ -302,7 +302,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public List getAllTables() { try { - return new ServerCallable>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { @Override public List call(NettyClientBase client) throws Exception { @@ -320,7 +320,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public List getAllTableOptions() { try { - return new ServerCallable>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { @Override public List call(NettyClientBase client) throws Exception { @@ -338,7 +338,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public List getAllTableStats() { try { - return new ServerCallable>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { @Override public List call(NettyClientBase client) throws Exception { @@ -356,7 +356,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public List getAllColumns() { try { - return new ServerCallable>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { @Override public List call(NettyClientBase client) throws Exception { @@ -374,7 +374,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final PartitionMethodDesc getPartitionMethod(final String databaseName, final String tableName) { try { - return new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public PartitionMethodDesc call(NettyClientBase client) throws ServiceException { TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); @@ -394,7 +394,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final boolean existPartitionMethod(final String databaseName, final String tableName) { try { - return new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); @@ -415,7 +415,7 @@ public abstract class AbstractCatalogClient implements CatalogService { public final PartitionDescProto getPartition(final String databaseName, final String tableName, final String partitionName) { try { - return new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public PartitionDescProto call(NettyClientBase client) throws ServiceException { PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder(); @@ -436,7 +436,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final List getPartitions(final String databaseName, final String tableName) { try { - return new ServerCallable>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, + return new ServerCallable>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public List call(NettyClientBase client) throws ServiceException { @@ -457,7 +457,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public List getAllPartitions() { try { - return new ServerCallable>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { @Override public List call(NettyClientBase client) throws Exception { @@ -475,7 +475,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Collection getAllTableNames(final String databaseName) { try { - return new ServerCallable>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Collection call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); PrimitiveProtos.StringListProto response = stub.getAllTableNames(null, ProtoUtil.convertString(databaseName)); @@ -491,7 +491,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Collection getFunctions() { try { - return new ServerCallable>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Collection call(NettyClientBase client) throws ServiceException { List list = new ArrayList(); GetFunctionsResponse response; @@ -518,7 +518,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final boolean createTable(final TableDesc desc) { try { - return new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.createTable(null, desc.getProto()).getValue(); @@ -537,7 +537,7 @@ public abstract class AbstractCatalogClient implements CatalogService { final String simpleName = splitted[1]; try { - return new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); @@ -561,7 +561,7 @@ public abstract class AbstractCatalogClient implements CatalogService { "tableName cannot be composed of multiple parts, but it is \"" + tableName + "\""); } try { - return new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); @@ -586,7 +586,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final boolean createIndex(final IndexDesc index) { try { - return new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.createIndex(null, index.getProto()).getValue(); @@ -601,7 +601,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final boolean existIndexByName(final String databaseName, final String indexName) { try { - return new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { IndexNameProto.Builder builder = IndexNameProto.newBuilder(); builder.setDatabaseName(databaseName); @@ -620,7 +620,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public boolean existIndexByColumn(final String databaseName, final String tableName, final String columnName) { try { - return new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder(); @@ -640,7 +640,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final IndexDesc getIndexByName(final String databaseName, final String indexName) { try { - return new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public IndexDesc call(NettyClientBase client) throws ServiceException { IndexNameProto.Builder builder = IndexNameProto.newBuilder(); @@ -662,7 +662,7 @@ public abstract class AbstractCatalogClient implements CatalogService { final String tableName, final String columnName) { try { - return new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public IndexDesc call(NettyClientBase client) throws ServiceException { GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder(); @@ -683,7 +683,7 @@ public abstract class AbstractCatalogClient implements CatalogService { public boolean dropIndex(final String databaseName, final String indexName) { try { - return new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { IndexNameProto.Builder builder = IndexNameProto.newBuilder(); @@ -703,7 +703,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public List getAllIndexes() { try { - return new ServerCallable>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { @Override public List call(NettyClientBase client) throws Exception { @@ -721,7 +721,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final boolean createFunction(final FunctionDesc funcDesc) { try { - return new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.createFunction(null, funcDesc.getProto()).getValue(); @@ -736,7 +736,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final boolean dropFunction(final String signature) { try { - return new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder(); builder.setSignature(signature); @@ -769,7 +769,7 @@ public abstract class AbstractCatalogClient implements CatalogService { FunctionDescProto descProto = null; try { - descProto = new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + descProto = new ServerCallable(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public FunctionDescProto call(NettyClientBase client) throws ServiceException { try { CatalogProtocolService.BlockingInterface stub = getStub(client); @@ -819,7 +819,7 @@ public abstract class AbstractCatalogClient implements CatalogService { } try { - return new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.containFunction(null, builder.build()).getValue(); @@ -834,7 +834,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final boolean alterTable(final AlterTableDesc desc) { try { - return new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.alterTable(null, desc.getProto()).getValue(); @@ -849,7 +849,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public boolean updateTableStats(final UpdateTableStatsProto updateTableStatsProto) { try { - return new ServerCallable(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.updateTableStats(null, updateTableStatsProto).getValue(); http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java index 6347ad1..9d0e427 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java @@ -48,8 +48,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { @Override public boolean createDatabase(final String databaseName) throws ServiceException { - return new ServerCallable(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { @@ -64,8 +64,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { @Override public boolean existDatabase(final String databaseName) throws ServiceException { - return new ServerCallable(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { @@ -80,8 +80,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { @Override public boolean dropDatabase(final String databaseName) throws ServiceException { - return new ServerCallable(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { @@ -96,8 +96,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { @Override public List getAllDatabaseNames() throws ServiceException { - return new ServerCallable>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public List call(NettyClientBase client) throws ServiceException { @@ -111,8 +111,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { public boolean existTable(final String tableName) throws ServiceException { - return new ServerCallable(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { connection.checkSessionAndGet(client); @@ -133,8 +133,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { final TableMeta meta, final PartitionMethodDesc partitionMethodDesc) throws SQLException, ServiceException { - return new ServerCallable(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public TableDesc call(NettyClientBase client) throws ServiceException, SQLException { @@ -169,8 +169,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { @Override public boolean dropTable(final String tableName, final boolean purge) throws ServiceException { - return new ServerCallable(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { @@ -190,8 +190,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { @Override public List getTableList(@Nullable final String databaseName) throws ServiceException { - return new ServerCallable>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public List call(NettyClientBase client) throws ServiceException { @@ -213,8 +213,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { @Override public TableDesc getTableDesc(final String tableName) throws ServiceException { - return new ServerCallable(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public TableDesc call(NettyClientBase client) throws ServiceException, SQLException { @@ -238,8 +238,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { @Override public List getFunctions(final String functionName) throws ServiceException { - return new ServerCallable>(connection.connPool, - connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + return new ServerCallable>(connection.manager, + connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { public List call(NettyClientBase client) throws ServiceException, SQLException { http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index 4444a31..99c58b6 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -19,7 +19,6 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.*; @@ -33,7 +32,6 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.jdbc.FetchResultSet; import org.apache.tajo.jdbc.TajoMemoryResultSet; import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.ServerCallable; import org.apache.tajo.util.ProtoUtil; @@ -115,8 +113,6 @@ public class QueryClientImpl implements QueryClient { tajoMaster.closeNonForwardQuery(null, builder.build()); } catch (Exception e) { LOG.warn("Fail to close a TajoMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e); - } finally { - connection.connPool.closeConnection(tmClient); } } @@ -158,8 +154,8 @@ public class QueryClientImpl implements QueryClient { @Override public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException { - return new ServerCallable(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException { @@ -184,8 +180,8 @@ public class QueryClientImpl implements QueryClient { @Override public ClientProtos.SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException { - return new ServerCallable(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException { @@ -321,8 +317,6 @@ public class QueryClientImpl implements QueryClient { } catch (Exception e) { throw new ServiceException(e.getMessage(), e); - } finally { - connection.connPool.releaseConnection(tmClient); } return new QueryStatus(res); } @@ -367,8 +361,6 @@ public class QueryClientImpl implements QueryClient { } catch (Exception e) { throw new ServiceException(e.getMessage(), e); - } finally { - connection.connPool.releaseConnection(tmClient); } } @@ -378,8 +370,8 @@ public class QueryClientImpl implements QueryClient { try { final ServerCallable callable = - new ServerCallable(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + new ServerCallable(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException { @@ -424,8 +416,8 @@ public class QueryClientImpl implements QueryClient { @Override public boolean updateQuery(final String sql) throws ServiceException { - return new ServerCallable(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { @@ -454,8 +446,8 @@ public class QueryClientImpl implements QueryClient { @Override public boolean updateQueryWithJson(final String json) throws ServiceException { - return new ServerCallable(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { @@ -482,8 +474,8 @@ public class QueryClientImpl implements QueryClient { @Override public List getRunningQueryList() throws ServiceException { - return new ServerCallable>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public List call(NettyClientBase client) throws ServiceException { @@ -502,8 +494,8 @@ public class QueryClientImpl implements QueryClient { @Override public List getFinishedQueryList() throws ServiceException { - return new ServerCallable>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public List call(NettyClientBase client) throws ServiceException { @@ -522,8 +514,8 @@ public class QueryClientImpl implements QueryClient { @Override public List getClusterInfo() throws ServiceException { - return new ServerCallable>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public List call(NettyClientBase client) throws ServiceException { @@ -574,8 +566,6 @@ public class QueryClientImpl implements QueryClient { } catch(Exception e) { LOG.debug("Error when checking for application status", e); - } finally { - connection.connPool.releaseConnection(tmClient); } return status; } @@ -591,8 +581,8 @@ public class QueryClientImpl implements QueryClient { } public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException { - return new ServerCallable(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public QueryInfoProto call(NettyClientBase client) throws ServiceException { connection.checkSessionAndGet(client); @@ -621,8 +611,8 @@ public class QueryClientImpl implements QueryClient { InetSocketAddress qmAddress = new InetSocketAddress( queryInfo.getHostNameOfQM(), queryInfo.getQueryMasterClientPort()); - return new ServerCallable(connection.connPool, qmAddress, - QueryMasterClientProtocol.class, false, true) { + return new ServerCallable(connection.manager, qmAddress, + QueryMasterClientProtocol.class, false) { public QueryHistoryProto call(NettyClientBase client) throws ServiceException { connection.checkSessionAndGet(client); http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index 6363198..d8152f4 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -30,7 +30,7 @@ import org.apache.tajo.ipc.ClientProtos.ResultCode; import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse; import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.ServerCallable; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.KeyValueSet; @@ -55,7 +55,7 @@ public class SessionConnection implements Closeable { private final Log LOG = LogFactory.getLog(TajoClientImpl.class); - final RpcConnectionPool connPool; + final RpcClientManager manager; private String baseDatabase; @@ -86,8 +86,8 @@ public class SessionConnection implements Closeable { this.properties = properties; - connPool = RpcConnectionPool.getPool(); - userInfo = UserRoleInfo.getCurrentUser(); + this.manager = RpcClientManager.getInstance(); + this.userInfo = UserRoleInfo.getCurrentUser(); this.baseDatabase = baseDatabase != null ? baseDatabase : null; this.serviceTracker = tracker; @@ -99,12 +99,12 @@ public class SessionConnection implements Closeable { public NettyClientBase getTajoMasterConnection(boolean asyncMode) throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException { - return connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, asyncMode); + return manager.getClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, asyncMode); } public NettyClientBase getConnection(InetSocketAddress addr, Class protocolClass, boolean asyncMode) throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException { - return connPool.getConnection(addr, protocolClass, asyncMode); + return manager.getClient(addr, protocolClass, asyncMode); } protected KeyValueSet getProperties() { @@ -127,7 +127,7 @@ public class SessionConnection implements Closeable { public boolean isConnected() { if(!closed.get()){ try { - return connPool.getConnection(serviceTracker.getClientServiceAddress(), + return manager.getClient(serviceTracker.getClientServiceAddress(), TajoMasterClientProtocol.class, false).isConnected(); } catch (Throwable e) { return false; @@ -141,7 +141,7 @@ public class SessionConnection implements Closeable { } public String getCurrentDatabase() throws ServiceException { - return new ServerCallable(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + return new ServerCallable(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { public String call(NettyClientBase client) throws ServiceException { checkSessionAndGet(client); @@ -153,8 +153,8 @@ public class SessionConnection implements Closeable { } public Map updateSessionVariables(final Map variables) throws ServiceException { - return new ServerCallable>(connPool, getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable>(manager, getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public Map call(NettyClientBase client) throws ServiceException { checkSessionAndGet(client); @@ -179,7 +179,7 @@ public class SessionConnection implements Closeable { } public Map unsetSessionVariables(final List variables) throws ServiceException { - return new ServerCallable>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + return new ServerCallable>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { public Map call(NettyClientBase client) throws ServiceException { checkSessionAndGet(client); @@ -209,7 +209,7 @@ public class SessionConnection implements Closeable { } public String getSessionVariable(final String varname) throws ServiceException { - return new ServerCallable(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + return new ServerCallable(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { public String call(NettyClientBase client) throws ServiceException { @@ -229,7 +229,7 @@ public class SessionConnection implements Closeable { } public Boolean existSessionVariable(final String varname) throws ServiceException { - return new ServerCallable(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + return new ServerCallable(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { checkSessionAndGet(client); @@ -247,8 +247,8 @@ public class SessionConnection implements Closeable { } public Map getAllSessionVariables() throws ServiceException { - return new ServerCallable>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, - false, true) { + return new ServerCallable>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, + false) { public Map call(NettyClientBase client) throws ServiceException { checkSessionAndGet(client); @@ -260,8 +260,8 @@ public class SessionConnection implements Closeable { } public Boolean selectDatabase(final String databaseName) throws ServiceException { - Boolean selected = new ServerCallable(connPool, getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + Boolean selected = new ServerCallable(manager, getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { checkSessionAndGet(client); @@ -286,13 +286,13 @@ public class SessionConnection implements Closeable { // remove session NettyClientBase client = null; try { - client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false); + client = manager.getClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, false); TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub(); tajoMaster.removeSession(null, sessionId); } catch (Throwable e) { // ignore } finally { - connPool.releaseConnection(client); + RpcClientManager.cleanup(client); } } @@ -329,7 +329,7 @@ public class SessionConnection implements Closeable { } public boolean reconnect() throws Exception { - return new ServerCallable(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + return new ServerCallable(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder(); http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java index 668a770..d2286cf 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java @@ -33,7 +33,7 @@ import org.apache.tajo.master.rm.WorkerResourceManager; import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.session.Session; import org.apache.tajo.util.NetUtils; @@ -112,7 +112,7 @@ public class QueryInProgress { masterContext.getResourceManager().releaseQueryMaster(queryId); if(queryMasterRpc != null) { - RpcConnectionPool.getPool().closeConnection(queryMasterRpc); + RpcClientManager.cleanup(queryMasterRpc); } try { @@ -157,7 +157,7 @@ public class QueryInProgress { InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort()); LOG.info("Connect to QueryMaster:" + addr); queryMasterRpc = - RpcConnectionPool.getPool().getConnection(addr, QueryMasterProtocol.class, true); + RpcClientManager.getInstance().getClient(addr, QueryMasterProtocol.class, true); queryMasterRpcClient = queryMasterRpc.getStub(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index 139359c..1fda7d4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -35,7 +35,7 @@ import org.apache.tajo.master.rm.TajoWorkerContainerId; import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.worker.TajoWorker; @@ -83,14 +83,12 @@ public class TajoContainerProxy extends ContainerProxy { NettyClientBase tajoWorkerRpc = null; try { InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort()); - tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true); + tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), NullCallback.get()); } catch (Throwable e) { /* Worker RPC failure */ context.getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, e.getMessage())); - } finally { - RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc); } } @@ -101,7 +99,7 @@ public class TajoContainerProxy extends ContainerProxy { .getQueryMasterManagerService().getBindAddr(); InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort()); - tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true); + tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); TajoWorkerProtocol.RunExecutionBlockRequestProto request = @@ -118,8 +116,6 @@ public class TajoContainerProxy extends ContainerProxy { tajoWorkerRpcClient.startExecutionBlock(null, request, NullCallback.get()); } catch (Throwable e) { LOG.error(e.getMessage(), e); - } finally { - RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc); } } @@ -160,21 +156,19 @@ public class TajoContainerProxy extends ContainerProxy { containerIdProtos.add(TajoWorkerContainerId.getContainerIdProto(eachContainerId)); } - RpcConnectionPool connPool = RpcConnectionPool.getPool(); + RpcClientManager manager = RpcClientManager.getInstance(); NettyClientBase tmClient = null; - try { - ServiceTracker serviceTracker = context.getQueryMasterContext().getWorkerContext().getServiceTracker(); - tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); - - QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); - masterClientService.releaseWorkerResource(null, - QueryCoordinatorProtocol.WorkerResourceReleaseRequest.newBuilder() - .setExecutionBlockId(executionBlockId.getProto()) - .addAllContainerIds(containerIdProtos) - .build(), - NullCallback.get()); - } finally { - connPool.releaseConnection(tmClient); - } + + ServiceTracker serviceTracker = context.getQueryMasterContext().getWorkerContext().getServiceTracker(); + tmClient = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); + + QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); + masterClientService.releaseWorkerResource(null, + QueryCoordinatorProtocol.WorkerResourceReleaseRequest.newBuilder() + .setExecutionBlockId(executionBlockId.getProto()) + .addAllContainerIds(containerIdProtos) + .build(), + NullCallback.get()); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index 9cbfb95..67dae06 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -41,7 +41,7 @@ import org.apache.tajo.master.event.QueryStopEvent; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.NetUtils; @@ -88,7 +88,7 @@ public class QueryMaster extends CompositeService implements EventHandler { private TajoWorker.WorkerContext workerContext; - private RpcConnectionPool connPool; + private RpcClientManager manager; private ExecutorService eventExecutor; @@ -104,7 +104,7 @@ public class QueryMaster extends CompositeService implements EventHandler { } try { this.systemConf = (TajoConf)conf; - this.connPool = RpcConnectionPool.getPool(); + this.manager = RpcClientManager.getInstance(); querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT); queryMasterContext = new QueryMasterContext(systemConf); @@ -187,7 +187,7 @@ public class QueryMaster extends CompositeService implements EventHandler { for (WorkerResourceProto worker : workers) { try { TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo(); - rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()), + rpc = manager.getClient(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()), TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); @@ -197,8 +197,6 @@ public class QueryMaster extends CompositeService implements EventHandler { continue; } catch (Exception e) { continue; - } finally { - connPool.releaseConnection(rpc); } } } @@ -211,15 +209,13 @@ public class QueryMaster extends CompositeService implements EventHandler { for (WorkerResourceProto worker : workers) { try { TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo(); - rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()), + rpc = manager.getClient(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()), TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); tajoWorkerProtocolService.cleanup(null, queryId.getProto(), NullCallback.get()); } catch (Exception e) { LOG.error(e.getMessage(), e); - } finally { - connPool.releaseConnection(rpc); } } } @@ -234,7 +230,7 @@ public class QueryMaster extends CompositeService implements EventHandler { // update master address in worker context. ServiceTracker serviceTracker = workerContext.getServiceTracker(); - rpc = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); + rpc = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); QueryCoordinatorProtocolService masterService = rpc.getStub(); CallFuture callBack = new CallFuture(); @@ -245,8 +241,6 @@ public class QueryMaster extends CompositeService implements EventHandler { return workerResourcesRequest.getWorkerResourcesList(); } catch (Exception e) { LOG.error(e.getMessage(), e); - } finally { - connPool.releaseConnection(rpc); } return new ArrayList(); } @@ -342,7 +336,7 @@ public class QueryMaster extends CompositeService implements EventHandler { NettyClientBase tmClient = null; try { - tmClient = connPool.getConnection(workerContext.getServiceTracker().getUmbilicalAddress(), + tmClient = manager.getClient(workerContext.getServiceTracker().getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); @@ -352,8 +346,6 @@ public class QueryMaster extends CompositeService implements EventHandler { //When tajo do stop cluster, tajo master maybe throw closed connection exception LOG.error(e.getMessage(), e); - } finally { - connPool.releaseConnection(tmClient); } try { @@ -445,7 +437,7 @@ public class QueryMaster extends CompositeService implements EventHandler { try { ServiceTracker serviceTracker = queryMasterContext.getWorkerContext().getServiceTracker(); - tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), + tmClient = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index 2377720..5ffc7a9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -36,7 +36,7 @@ import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.NetUtils; @@ -79,7 +79,7 @@ public class ExecutionBlockContext { private ExecutionBlockSharedResource resource; private TajoQueryEngine queryEngine; - private RpcConnectionPool connPool; + private RpcClientManager connManager; private InetSocketAddress qmMasterAddr; private WorkerConnectionInfo queryMaster; private TajoConf systemConf; @@ -100,7 +100,7 @@ public class ExecutionBlockContext { ExecutionBlockId executionBlockId, WorkerConnectionInfo queryMaster) throws Throwable { this.manager = manager; this.executionBlockId = executionBlockId; - this.connPool = RpcConnectionPool.getPool(); + this.connManager = RpcClientManager.getInstance(); this.queryMaster = queryMaster; this.systemConf = conf; this.reporter = new Reporter(); @@ -139,12 +139,8 @@ public class ExecutionBlockContext { } catch (Throwable e) { try { NettyClientBase client = getQueryMasterConnection(); - try { - QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub(); - stub.killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get()); - } finally { - connPool.releaseConnection(client); - } + QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub(); + stub.killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get()); } catch (Throwable t) { //ignore } @@ -158,11 +154,7 @@ public class ExecutionBlockContext { public NettyClientBase getQueryMasterConnection() throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException { - return connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true); - } - - public void releaseConnection(NettyClientBase connection) { - connPool.releaseConnection(connection); + return connManager.getClient(qmMasterAddr, QueryMasterProtocol.class, true); } public void stop(){ @@ -274,12 +266,8 @@ public class ExecutionBlockContext { private void sendExecutionBlockReport(ExecutionBlockReport reporter) throws Exception { NettyClientBase client = getQueryMasterConnection(); - try { - QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub(); - stub.doneExecutionBlock(null, reporter, NullCallback.get()); - } finally { - connPool.releaseConnection(client); - } + QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub(); + stub.doneExecutionBlock(null, reporter, NullCallback.get()); } protected void reportExecutionBlock(ExecutionBlockId ebId) { @@ -405,7 +393,6 @@ public class ExecutionBlockContext { throw new RuntimeException(t); } } finally { - releaseConnection(client); if (remainingRetries > 0 && !reporterStop.get()) { synchronized (reporterThread) { try { http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index 6798c33..7ba2ebc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -52,19 +52,14 @@ import org.apache.tajo.querymaster.StageState; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.ApplicationIdUtils; import java.net.InetSocketAddress; import java.util.*; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; public class TajoResourceAllocator extends AbstractResourceAllocator { @@ -203,15 +198,13 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { NettyClientBase tajoWorkerRpc = null; try { InetSocketAddress addr = new InetSocketAddress(worker.getHost(), worker.getPort()); - tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true); + tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); tajoWorkerRpcClient.stopExecutionBlock(null, executionBlockId.getProto(), NullCallback.get(PrimitiveProtos.BoolProto.class)); } catch (Throwable e) { LOG.error(e.getMessage(), e); - } finally { - RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc); } } @@ -318,17 +311,16 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { .setQueryId(event.getExecutionBlockId().getQueryId().getProto()) .build(); - RpcConnectionPool connPool = RpcConnectionPool.getPool(); + NettyClientBase tmClient = null; try { ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker(); - tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); + tmClient = RpcClientManager.getInstance(). + getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); masterClientService.allocateWorkerResources(null, request, callBack); } catch (Throwable e) { LOG.error(e.getMessage(), e); - } finally { - connPool.releaseConnection(tmClient); } WorkerResourceAllocationResponse response = null; http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index b08af2b..61a05dc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -426,50 +426,47 @@ public class Task { context.getHashShuffleAppenderManager().finalizeTask(taskId); NettyClientBase client = executionBlockContext.getQueryMasterConnection(); - try { - QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = client.getStub(); - if (context.isStopped()) { - context.setExecutorProgress(0.0f); - if (context.getState() == TaskAttemptState.TA_KILLED) { - queryMasterStub.statusUpdate(null, getReport(), NullCallback.get()); - executionBlockContext.killedTasksNum.incrementAndGet(); - } else { - context.setState(TaskAttemptState.TA_FAILED); - TaskFatalErrorReport.Builder errorBuilder = - TaskFatalErrorReport.newBuilder() - .setId(getId().getProto()); - if (error != null) { - if (error.getMessage() == null) { - errorBuilder.setErrorMessage(error.getClass().getCanonicalName()); - } else { - errorBuilder.setErrorMessage(error.getMessage()); - } - errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error)); - } + QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = client.getStub(); + if (context.isStopped()) { + context.setExecutorProgress(0.0f); - queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get()); - executionBlockContext.failedTasksNum.incrementAndGet(); - } + if (context.getState() == TaskAttemptState.TA_KILLED) { + queryMasterStub.statusUpdate(null, getReport(), NullCallback.get()); + executionBlockContext.killedTasksNum.incrementAndGet(); } else { - // if successful - context.setProgress(1.0f); - context.setState(TaskAttemptState.TA_SUCCEEDED); - executionBlockContext.succeededTasksNum.incrementAndGet(); + context.setState(TaskAttemptState.TA_FAILED); + TaskFatalErrorReport.Builder errorBuilder = + TaskFatalErrorReport.newBuilder() + .setId(getId().getProto()); + if (error != null) { + if (error.getMessage() == null) { + errorBuilder.setErrorMessage(error.getClass().getCanonicalName()); + } else { + errorBuilder.setErrorMessage(error.getMessage()); + } + errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error)); + } - TaskCompletionReport report = getTaskCompletionReport(); - queryMasterStub.done(null, report, NullCallback.get()); + queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get()); + executionBlockContext.failedTasksNum.incrementAndGet(); } - finishTime = System.currentTimeMillis(); - LOG.info(context.getTaskId() + " completed. " + - "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() + - ", succeeded: " + executionBlockContext.succeededTasksNum.intValue() - + ", killed: " + executionBlockContext.killedTasksNum.intValue() - + ", failed: " + executionBlockContext.failedTasksNum.intValue()); - cleanupTask(); - } finally { - executionBlockContext.releaseConnection(client); + } else { + // if successful + context.setProgress(1.0f); + context.setState(TaskAttemptState.TA_SUCCEEDED); + executionBlockContext.succeededTasksNum.incrementAndGet(); + + TaskCompletionReport report = getTaskCompletionReport(); + queryMasterStub.done(null, report, NullCallback.get()); } + finishTime = System.currentTimeMillis(); + LOG.info(context.getTaskId() + " completed. " + + "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() + + ", succeeded: " + executionBlockContext.succeededTasksNum.intValue() + + ", killed: " + executionBlockContext.killedTasksNum.intValue() + + ", failed: " + executionBlockContext.failedTasksNum.intValue()); + cleanupTask(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java index 058ea42..642c914 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java @@ -299,8 +299,6 @@ public class TaskRunner extends AbstractService { } } catch (Throwable t) { LOG.fatal(t.getMessage(), t); - } finally { - executionBlockContext.releaseConnection(client); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java index e9f90ca..f8d5fd9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java @@ -31,7 +31,7 @@ import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse; import org.apache.tajo.ipc.TajoResourceTrackerProtocol; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.storage.DiskDeviceInfo; import org.apache.tajo.storage.DiskMountInfo; @@ -54,7 +54,7 @@ public class WorkerHeartbeatService extends AbstractService { private final TajoWorker.WorkerContext context; private TajoConf systemConf; - private RpcConnectionPool connectionPool; + private RpcClientManager connectionManager; private WorkerHeartbeatThread thread; private static final float HDFS_DATANODE_STORAGE_SIZE; @@ -74,7 +74,7 @@ public class WorkerHeartbeatService extends AbstractService { } this.systemConf = (TajoConf) conf; - connectionPool = RpcConnectionPool.getPool(); + this.connectionManager = RpcClientManager.getInstance(); super.serviceInit(conf); } @@ -181,7 +181,7 @@ public class WorkerHeartbeatService extends AbstractService { CallFuture callBack = new CallFuture(); ServiceTracker serviceTracker = context.getServiceTracker(); - rmClient = connectionPool.getConnection(serviceTracker.getResourceTrackerAddress(), + rmClient = connectionManager.getClient(serviceTracker.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true); TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub(); resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack); @@ -204,8 +204,6 @@ public class WorkerHeartbeatService extends AbstractService { LOG.warn("Heartbeat response is being delayed.", te); } catch (Exception e) { LOG.error(e.getMessage(), e); - } finally { - connectionPool.releaseConnection(rmClient); } try { http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java index 4b76c73..f94bd78 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java @@ -19,20 +19,15 @@ package org.apache.tajo.worker.rule; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rule.*; import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; -import org.apache.tajo.util.NetUtils; import org.apache.tajo.worker.TajoWorker; -import java.net.InetSocketAddress; - /** * With this rule, Tajo worker will check the connectivity to tajo master server. */ @@ -42,20 +37,11 @@ import java.net.InetSocketAddress; public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule { private void checkTajoMasterConnectivity(TajoConf tajoConf) throws Exception { - RpcConnectionPool pool = RpcConnectionPool.getPool(); - NettyClientBase masterClient = null; - InetSocketAddress masterAddress = null; - - try { - ServiceTracker serviceTracker = ServiceTrackerFactory.get(tajoConf); - masterClient = pool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); - masterClient.getStub(); - } finally { - if (masterClient != null) { - pool.releaseConnection(masterClient); - } - } - + RpcClientManager manager = RpcClientManager.getInstance(); + + ServiceTracker serviceTracker = ServiceTrackerFactory.get(tajoConf); + NettyClientBase masterClient = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); + masterClient.getStub(); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java index 3866b09..74c09e5 100644 --- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java @@ -67,40 +67,6 @@ public class RpcUtils { return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1])); } - public static class Timer { - private long remaining; - private long prev; - public Timer(long timeout) { - this.remaining = timeout; - this.prev = System.currentTimeMillis(); - } - - public boolean isTimedOut() { - return remaining <= 0; - } - - public void elapsed() { - long current = System.currentTimeMillis(); - remaining -= current - prev; - prev = current; - } - - public void interval(long wait) { - if (wait <= 0 || isTimedOut()) { - return; - } - try { - Thread.sleep(Math.min(remaining, wait)); - } catch (Exception ex) { - // ignore - } - } - - public long remaining() { - return remaining; - } - } - // non-blocking lock which passes only a ticket before cleared or removed public static class Scrutineer { http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java index 3d856ce..8f2c2a1 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java @@ -20,17 +20,16 @@ package org.apache.tajo.rpc; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.*; - import io.netty.channel.*; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.concurrent.GenericFutureListener; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; +import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey; import org.apache.tajo.rpc.RpcProtos.RpcRequest; import org.apache.tajo.rpc.RpcProtos.RpcResponse; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.concurrent.GenericFutureListener; - import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.util.Map; @@ -53,11 +52,16 @@ public class AsyncRpcClient extends NettyClientBase { */ AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries) throws ClassNotFoundException, NoSuchMethodException { + this(rpcConnectionKey, retries, 0); + } + + AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries, int idleTimeSeconds) + throws ClassNotFoundException, NoSuchMethodException { super(rpcConnectionKey, retries); stubMethod = getServiceClass().getMethod("newStub", RpcChannel.class); rpcChannel = new ProxyRpcChannel(); inboundHandler = new ClientChannelInboundHandler(); - init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance())); + init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance(), idleTimeSeconds)); } @Override @@ -177,7 +181,7 @@ public class AsyncRpcClient extends NettyClientBase { } @ChannelHandler.Sharable - private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter { + private class ClientChannelInboundHandler extends SimpleChannelInboundHandler { void registerCallback(int seqId, ResponseCallback callback) { @@ -188,25 +192,23 @@ public class AsyncRpcClient extends NettyClientBase { } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - if (msg instanceof RpcResponse) { - try { - RpcResponse response = (RpcResponse) msg; - ResponseCallback callback = requests.remove(response.getId()); + protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception { + ResponseCallback callback = requests.remove(response.getId()); - if (callback == null) { - LOG.warn("Dangling rpc call"); - } else { - callback.run(response); - } - } finally { - ReferenceCountUtil.release(msg); - } + if (callback == null) { + LOG.warn("Dangling rpc call"); + } else { + callback.run(response); } } @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + LOG.info("Connection established successfully : " + ctx.channel().remoteAddress()); + } + + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { LOG.error(getRemoteAddress() + "," + protocol + "," + cause.getMessage(), cause); @@ -218,9 +220,17 @@ public class AsyncRpcClient extends NettyClientBase { } else { LOG.error("RPC Exception:" + cause.getMessage()); } - - if (ctx != null && ctx.channel().isActive()) { - ctx.channel().close(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent e = (IdleStateEvent) evt; + /* If all requests is done and event is triggered, channel will be closed. */ + if (e.state() == IdleState.ALL_IDLE && requests.size() == 0) { + ctx.close(); + LOG.warn("Idle connection closed successfully :" + ctx.channel().remoteAddress()); + } } } }