Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C7CE41823E for ; Tue, 28 Apr 2015 09:28:32 +0000 (UTC) Received: (qmail 13446 invoked by uid 500); 28 Apr 2015 09:28:32 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 13354 invoked by uid 500); 28 Apr 2015 09:28:32 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 13335 invoked by uid 99); 28 Apr 2015 09:28:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Apr 2015 09:28:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 774B0E0551; Tue, 28 Apr 2015 09:28:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jhkim@apache.org To: commits@tajo.apache.org Date: Tue, 28 Apr 2015 09:28:33 -0000 Message-Id: <59f5aa1baa884cc39a6eb2825f91d8ef@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] tajo git commit: TAJO-1563: Improve RPC error handling. (jinho) TAJO-1563: Improve RPC error handling. (jinho) Closes #554 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/2e7d03df Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/2e7d03df Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/2e7d03df Branch: refs/heads/master Commit: 2e7d03dff9480ecb7a4e4ed09a5578ba618de2ae Parents: 1971d85 Author: Jinho Kim Authored: Tue Apr 28 18:27:05 2015 +0900 Committer: Jinho Kim Committed: Tue Apr 28 18:27:05 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../apache/tajo/client/SessionConnection.java | 12 +- .../java/org/apache/tajo/master/TajoMaster.java | 9 +- .../tajo/master/rm/TajoResourceTracker.java | 8 +- .../apache/tajo/querymaster/QueryMaster.java | 13 +- .../tajo/worker/ExecutionBlockContext.java | 4 +- .../tajo/worker/TajoResourceAllocator.java | 6 +- .../java/org/apache/tajo/worker/TajoWorker.java | 7 + .../java/org/apache/tajo/worker/TaskRunner.java | 14 +- .../tajo/worker/WorkerHeartbeatService.java | 19 +- .../tajo/ws/rs/resources/DatabasesResource.java | 8 +- .../responses/WorkerConnectionInfoResponse.java | 25 +- .../tajo/ws/rs/responses/WorkerResponse.java | 47 ++- .../org/apache/tajo/rpc/RpcChannelFactory.java | 6 +- .../java/org/apache/tajo/rpc/RpcConstants.java | 32 ++ .../org/apache/tajo/rpc/AsyncRpcClient.java | 206 +++------- .../org/apache/tajo/rpc/AsyncRpcServer.java | 65 +-- .../org/apache/tajo/rpc/BlockingRpcClient.java | 214 ++++------ .../org/apache/tajo/rpc/BlockingRpcServer.java | 64 +-- .../java/org/apache/tajo/rpc/CallFuture.java | 32 +- .../apache/tajo/rpc/ChannelEventListener.java | 39 ++ .../tajo/rpc/ConnectionCloseFutureListener.java | 35 -- .../apache/tajo/rpc/DefaultRpcController.java | 2 +- .../apache/tajo/rpc/MonitorClientHandler.java | 101 +++++ .../apache/tajo/rpc/MonitorServerHandler.java | 73 ++++ .../org/apache/tajo/rpc/MonitorStateEvent.java | 49 +++ .../org/apache/tajo/rpc/NettyClientBase.java | 303 ++++++++++++-- .../tajo/rpc/ProtoChannelInitializer.java | 57 --- .../tajo/rpc/ProtoClientChannelInitializer.java | 63 +++ .../org/apache/tajo/rpc/ProtoDeclaration.java | 24 ++ .../tajo/rpc/ProtoServerChannelInitializer.java | 50 +++ .../apache/tajo/rpc/RecoverableException.java | 42 ++ .../org/apache/tajo/rpc/RpcClientManager.java | 123 ++++-- .../src/main/proto/TestProtocol.proto | 3 +- .../java/org/apache/tajo/rpc/TestAsyncRpc.java | 350 +++++++++++++---- .../org/apache/tajo/rpc/TestBlockingRpc.java | 392 +++++++++++++------ .../apache/tajo/rpc/TestRpcClientManager.java | 155 ++++++-- .../rpc/test/impl/DummyProtocolAsyncImpl.java | 21 +- .../test/impl/DummyProtocolBlockingImpl.java | 13 +- 39 files changed, 1879 insertions(+), 809 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 7b16e11..93fb092 100644 --- a/CHANGES +++ b/CHANGES @@ -22,6 +22,8 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1563: Improve RPC error handling. (jinho) + TAJO-1311: Enable Scattered Hash Shuffle for CTAS statement. (jaehwa) TAJO-1548: Refactoring condition code for CHAR into CatalogUtil. http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index d8152f4..be757af 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -31,15 +31,15 @@ import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse; import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcClientManager; +import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.rpc.ServerCallable; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.ProtoUtil; -import io.netty.channel.ConnectTimeoutException; - import java.io.Closeable; import java.io.IOException; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashMap; @@ -87,6 +87,10 @@ public class SessionConnection implements Closeable { this.properties = properties; this.manager = RpcClientManager.getInstance(); + this.manager.setRetries(properties.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES)); + this.manager.setTimeoutSeconds( + properties.getInt(RpcConstants.RPC_CLIENT_TIMEOUT_SECS, 0)); // disable rpc timeout + this.userInfo = UserRoleInfo.getCurrentUser(); this.baseDatabase = baseDatabase != null ? baseDatabase : null; @@ -98,12 +102,12 @@ public class SessionConnection implements Closeable { } public NettyClientBase getTajoMasterConnection(boolean asyncMode) throws NoSuchMethodException, - ConnectTimeoutException, ClassNotFoundException { + ConnectException, ClassNotFoundException { return manager.getClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, asyncMode); } public NettyClientBase getConnection(InetSocketAddress addr, Class protocolClass, boolean asyncMode) - throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException { + throws NoSuchMethodException, ConnectException, ClassNotFoundException { return manager.getClient(addr, protocolClass, asyncMode); } http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index 0a5de58..d6ae49c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -44,6 +44,8 @@ import org.apache.tajo.master.rm.WorkerResourceManager; import org.apache.tajo.metrics.CatalogMetricsGaugeSet; import org.apache.tajo.metrics.WorkerResourceMetricsGaugeSet; import org.apache.tajo.rpc.RpcChannelFactory; +import org.apache.tajo.rpc.RpcClientManager; +import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.rule.EvaluationContext; import org.apache.tajo.rule.EvaluationFailedException; import org.apache.tajo.rule.SelfDiagnosisRuleEngine; @@ -167,8 +169,12 @@ public class TajoMaster extends CompositeService { try { RackResolver.init(systemConf); + RpcClientManager rpcManager = RpcClientManager.getInstance(); + rpcManager.setRetries(systemConf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES)); + rpcManager.setTimeoutSeconds( + systemConf.getInt(RpcConstants.RPC_CLIENT_TIMEOUT_SECS, RpcConstants.DEFAULT_RPC_TIMEOUT_SECONDS)); + initResourceManager(); - initWebServer(); this.dispatcher = new AsyncDispatcher(); addIfService(dispatcher); @@ -328,6 +334,7 @@ public class TajoMaster extends CompositeService { LOG.error(e.getMessage(), e); } + initWebServer(); initSystemMetrics(); haService = ServiceTrackerFactory.get(systemConf); http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java index ba021fc..4f3b66a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java @@ -76,7 +76,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource } @Override - public void serviceInit(Configuration conf) { + public void serviceInit(Configuration conf) throws Exception { if (!(conf instanceof TajoConf)) { throw new IllegalArgumentException("Configuration must be a TajoConf instance"); } @@ -98,17 +98,17 @@ public class TajoResourceTracker extends AbstractService implements TajoResource systemConf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress)); LOG.info("TajoResourceTracker starts up (" + this.bindAddress + ")"); - super.start(); + super.serviceInit(conf); } @Override - public void serviceStop() { + public void serviceStop() throws Exception { // server can be null if some exception occurs before the rpc server starts up. if(server != null) { server.shutdown(); server = null; } - super.stop(); + super.serviceStop(); } /** The response builder */ http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index 67dae06..6c5bd22 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -38,10 +38,7 @@ import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.event.QueryStartEvent; import org.apache.tajo.master.event.QueryStopEvent; -import org.apache.tajo.rpc.CallFuture; -import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcClientManager; +import org.apache.tajo.rpc.*; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.NetUtils; @@ -192,10 +189,8 @@ public class QueryMaster extends CompositeService implements EventHandler { TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); tajoWorkerProtocolService.cleanupExecutionBlocks(null, executionBlockListProto, NullCallback.get()); - } catch (RuntimeException e) { - LOG.warn("Ignoring RuntimeException. " + e.getMessage(), e); - continue; } catch (Exception e) { + LOG.warn("Ignoring exception. " + e.getMessage(), e); continue; } } @@ -341,6 +336,7 @@ public class QueryMaster extends CompositeService implements EventHandler { QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); masterClientService.heartbeat(future.getController(), queryHeartbeat, future); + future.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (Exception e) { //this function will be closed in new thread. //When tajo do stop cluster, tajo master maybe throw closed connection exception @@ -441,9 +437,8 @@ public class QueryMaster extends CompositeService implements EventHandler { QueryCoordinatorProtocol.class, true); QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); - CallFuture callBack = new CallFuture(); TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask); - masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack); + masterClientService.heartbeat(null, queryHeartbeat, NullCallback.get()); } catch (Throwable t) { t.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index fcf787e..0d26e6c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -20,7 +20,6 @@ package org.apache.tajo.worker; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import io.netty.channel.ConnectTimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; @@ -45,6 +44,7 @@ import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.Pair; import java.io.IOException; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; @@ -154,7 +154,7 @@ public class ExecutionBlockContext { } public NettyClientBase getQueryMasterConnection() - throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException { + throws NoSuchMethodException, ConnectException, ClassNotFoundException { return connManager.getClient(qmMasterAddr, QueryMasterProtocol.class, true); } http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index 7ba2ebc..05dd1a9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -318,7 +318,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { tmClient = RpcClientManager.getInstance(). getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); - masterClientService.allocateWorkerResources(null, request, callBack); + masterClientService.allocateWorkerResources(callBack.getController(), request, callBack); } catch (Throwable e) { LOG.error(e.getMessage(), e); } @@ -335,8 +335,12 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { } catch (TimeoutException e) { LOG.info("No available worker resource for " + event.getExecutionBlockId()); continue; + } catch (ExecutionException e) { + LOG.error(e.getMessage(), e); + break; } } + int numAllocatedContainers = 0; if(response != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 17af71a..79b83e4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -37,6 +37,8 @@ import org.apache.tajo.catalog.FunctionDesc; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.function.FunctionLoader; import org.apache.tajo.function.FunctionSignature; +import org.apache.tajo.rpc.RpcClientManager; +import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.service.TajoMasterInfo; @@ -152,6 +154,11 @@ public class TajoWorker extends CompositeService { this.systemConf = (TajoConf)conf; RackResolver.init(systemConf); + RpcClientManager rpcManager = RpcClientManager.getInstance(); + rpcManager.setRetries(systemConf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES)); + rpcManager.setTimeoutSeconds( + systemConf.getInt(RpcConstants.RPC_CLIENT_TIMEOUT_SECS, RpcConstants.DEFAULT_RPC_TIMEOUT_SECONDS)); + serviceTracker = ServiceTrackerFactory.get(systemConf); this.workerContext = new WorkerContext(); http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java index 642c914..6076913 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java @@ -19,7 +19,6 @@ package org.apache.tajo.worker; import com.google.common.util.concurrent.ThreadFactoryBuilder; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -38,8 +37,7 @@ import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; -import io.netty.channel.ConnectTimeoutException; - +import java.net.ConnectException; import java.util.concurrent.*; import static org.apache.tajo.ipc.TajoWorkerProtocol.*; @@ -200,7 +198,7 @@ public class TaskRunner extends AbstractService { NettyClientBase client; try { client = executionBlockContext.getQueryMasterConnection(); - } catch (ConnectTimeoutException ce) { + } catch (ConnectException ce) { // NettyClientBase throws ConnectTimeoutException if connection was failed stop(); getContext().stopTaskRunner(getId()); @@ -238,17 +236,15 @@ public class TaskRunner extends AbstractService { if(stopped) { break; } - - if(callFuture.getController().failed()){ - LOG.error(callFuture.getController().errorText()); - break; - } // if there has been no assigning task for a given period, // TaskRunner will retry to request an assigning task. if (LOG.isDebugEnabled()) { LOG.info("Retry assigning task:" + getId()); } continue; + } catch (ExecutionException ee) { + LOG.error(ee.getMessage(), ee); + break; } if (taskRequest != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java index f8d5fd9..9afee5a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java @@ -19,13 +19,12 @@ package org.apache.tajo.worker; import com.google.common.collect.Lists; -import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary; +import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol.ServerStatusProto; import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse; import org.apache.tajo.ipc.TajoResourceTrackerProtocol; @@ -187,17 +186,13 @@ public class WorkerHeartbeatService extends AbstractService { resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack); TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS); - if(response != null) { - ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary(); - if(clusterResourceSummary.getNumWorkers() > 0) { - context.setNumClusterNodes(clusterResourceSummary.getNumWorkers()); - } - context.setClusterResource(clusterResourceSummary); - } else { - if(callBack.getController().failed()) { - throw new ServiceException(callBack.getController().errorText()); - } + + QueryCoordinatorProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary(); + if(clusterResourceSummary.getNumWorkers() > 0) { + context.setNumClusterNodes(clusterResourceSummary.getNumWorkers()); } + context.setClusterResource(clusterResourceSummary); + } catch (InterruptedException e) { break; } catch (TimeoutException te) { http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/DatabasesResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/DatabasesResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/DatabasesResource.java index 3868cab..b807198 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/DatabasesResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/DatabasesResource.java @@ -254,14 +254,18 @@ public class DatabasesResource { if (selectedDatabase != null) { List tablespacesList = catalogService.getAllTablespaces(); TablespaceProto selectedTablespace = null; - + for (TablespaceProto tablespace: tablespacesList) { if (tablespace.hasId() && tablespace.getId() == selectedDatabase.getSpaceId()) { selectedTablespace = tablespace; break; } } - + + if(selectedTablespace == null) { + return ResourcesUtil.createExceptionResponse(LOG, "Tablespace not found."); + } + DatabaseInfoResponse databaseInfo = new DatabaseInfoResponse(); databaseInfo.setId(selectedDatabase.getId()); databaseInfo.setName(selectedDatabase.getName()); http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerConnectionInfoResponse.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerConnectionInfoResponse.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerConnectionInfoResponse.java index 3c8cb8b..67e59ac 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerConnectionInfoResponse.java +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerConnectionInfoResponse.java @@ -52,16 +52,27 @@ public class WorkerConnectionInfoResponse { return id; } - public void setId(int id) { - this.id = id; - } - public String getHost() { return host; } - public void setHost(String host) { - this.host = host; + public int getPeerRpcPort() { + return peerRpcPort; + } + + public int getPullServerPort() { + return pullServerPort; + } + + public int getQueryMasterPort() { + return queryMasterPort; + } + + public int getClientPort() { + return clientPort; + } + + public int getHttpInfoPort() { + return httpInfoPort; } - } http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java index 84b81f8..ff0399c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java @@ -69,8 +69,51 @@ public class WorkerResponse { return connectionInfo; } - public void setConnectionInfo(WorkerConnectionInfoResponse connectionInfo) { - this.connectionInfo = connectionInfo; + public int getCpuCoreSlots() { + return cpuCoreSlots; } + public int getMemoryMB() { + return memoryMB; + } + + public float getUsedDiskSlots() { + return usedDiskSlots; + } + + public int getUsedMemoryMB() { + return usedMemoryMB; + } + + public int getUsedCpuCoreSlots() { + return usedCpuCoreSlots; + } + + public long getMaxHeap() { + return maxHeap; + } + + public long getFreeHeap() { + return freeHeap; + } + + public long getTotalHeap() { + return totalHeap; + } + + public int getNumRunningTasks() { + return numRunningTasks; + } + + public int getNumQueryMasterTasks() { + return numQueryMasterTasks; + } + + public long getLastHeartbeatTime() { + return lastHeartbeatTime; + } + + public float getDiskSlots() { + return diskSlots; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java index ed6b634..eb34ca2 100644 --- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java @@ -19,13 +19,11 @@ package org.apache.tajo.rpc; import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import java.util.Map; import java.util.Queue; http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java new file mode 100644 index 0000000..165cd2a --- /dev/null +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +public class RpcConstants { + + public static final String PING_PACKET = "TAJO"; + public static final String RPC_CLIENT_RETRY_MAX = "tajo.rpc.client.retry.max"; + public static final String RPC_CLIENT_TIMEOUT_SECS = "tajo.rpc.client.timeout-secs"; + + public static final int DEFAULT_RPC_RETRIES = 3; + public static final int DEFAULT_RPC_TIMEOUT_SECONDS = 180; + public static final int DEFAULT_CONNECT_TIMEOUT = 60000; // 60 sec + public static final int DEFAULT_PAUSE = 1000; // 1 sec + public static final int DEFAULT_FUTURE_TIMEOUT_SECONDS = 10; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java index 8f2c2a1..dd7d495 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java @@ -20,120 +20,97 @@ package org.apache.tajo.rpc; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.*; -import io.netty.channel.*; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; -import io.netty.util.concurrent.GenericFutureListener; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import io.netty.channel.ChannelHandler; import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey; -import org.apache.tajo.rpc.RpcProtos.RpcRequest; import org.apache.tajo.rpc.RpcProtos.RpcResponse; import java.lang.reflect.Method; -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; -public class AsyncRpcClient extends NettyClientBase { - private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class); - - private final ConcurrentMap requests = - new ConcurrentHashMap(); +public class AsyncRpcClient extends NettyClientBase { private final Method stubMethod; private final ProxyRpcChannel rpcChannel; - private final ClientChannelInboundHandler inboundHandler; + private final NettyChannelInboundHandler handler; - /** - * Intentionally make this method package-private, avoiding user directly - * new an instance through this constructor. - */ AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries) throws ClassNotFoundException, NoSuchMethodException { - this(rpcConnectionKey, retries, 0); + this(rpcConnectionKey, retries, 0, TimeUnit.NANOSECONDS, false); } - AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries, int idleTimeSeconds) + /** + * Intentionally make this method package-private, avoiding user directly + * new an instance through this constructor. + * + * @param rpcConnectionKey + * @param retries retry operation number of times + * @param timeout disable ping, it trigger timeout event on idle-state. + * otherwise it is request timeout on active-state + * @param timeUnit TimeUnit + * @param enablePing enable to detect remote peer hangs + * @throws ClassNotFoundException + * @throws NoSuchMethodException + */ + AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries, long timeout, TimeUnit timeUnit, boolean enablePing) throws ClassNotFoundException, NoSuchMethodException { super(rpcConnectionKey, retries); - stubMethod = getServiceClass().getMethod("newStub", RpcChannel.class); - rpcChannel = new ProxyRpcChannel(); - inboundHandler = new ClientChannelInboundHandler(); - init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance(), idleTimeSeconds)); + + this.stubMethod = getServiceClass().getMethod("newStub", RpcChannel.class); + this.rpcChannel = new ProxyRpcChannel(); + this.handler = new ClientChannelInboundHandler(); + init(new ProtoClientChannelInitializer(handler, + RpcResponse.getDefaultInstance(), + timeUnit.toNanos(timeout), + enablePing)); } @Override - public T getStub() { + public I getStub() { return getStub(stubMethod, rpcChannel); } - protected void sendExceptions(String message) { - for(Map.Entry callbackEntry: requests.entrySet()) { - ResponseCallback callback = callbackEntry.getValue(); - Integer id = callbackEntry.getKey(); - - RpcResponse.Builder responseBuilder = RpcResponse.newBuilder() - .setErrorMessage(message) - .setId(id); - - callback.run(responseBuilder.build()); - } - } - @Override - public void close() { - sendExceptions("AsyncRpcClient terminates all the connections"); - - super.close(); + protected NettyChannelInboundHandler getHandler() { + return handler; } private class ProxyRpcChannel implements RpcChannel { + private final AtomicInteger sequence = new AtomicInteger(0); + public void callMethod(final MethodDescriptor method, final RpcController controller, final Message param, final Message responseType, - RpcCallback done) { + final RpcCallback done) { int nextSeqId = sequence.getAndIncrement(); + RpcProtos.RpcRequest rpcRequest = buildRequest(nextSeqId, method, param); - Message rpcRequest = buildRequest(nextSeqId, method, param); - - inboundHandler.registerCallback(nextSeqId, - new ResponseCallback(controller, responseType, done)); - - ChannelPromise channelPromise = getChannel().newPromise(); - channelPromise.addListener(new GenericFutureListener() { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - inboundHandler.exceptionCaught(null, new ServiceException(future.cause())); - } - } - }); - getChannel().writeAndFlush(rpcRequest, channelPromise); + invoke(rpcRequest, new ResponseCallback(controller, responseType, done), 0); } + } - private Message buildRequest(int seqId, - MethodDescriptor method, - Message param) { + @ChannelHandler.Sharable + private class ClientChannelInboundHandler extends NettyChannelInboundHandler { - RpcRequest.Builder requestBuilder = RpcRequest.newBuilder() - .setId(seqId) - .setMethodName(method.getName()); + @Override + protected void run(RpcResponse response, ResponseCallback callback) throws Exception { + callback.run(response); + } - if (param != null) { - requestBuilder.setRequestMessage(param.toByteString()); - } + @Override + protected void handleException(int requestId, ResponseCallback callback, String message) { + RpcResponse.Builder responseBuilder = RpcResponse.newBuilder() + .setErrorMessage(message + "") + .setId(requestId); - return requestBuilder.build(); + callback.run(responseBuilder.build()); } } - private class ResponseCallback implements RpcCallback { + static class ResponseCallback implements RpcCallback { private final RpcController controller; private final Message responsePrototype; private final RpcCallback callback; @@ -149,88 +126,27 @@ public class AsyncRpcClient extends NettyClientBase { @Override public void run(RpcResponse rpcResponse) { // if hasErrorMessage is true, it means rpc-level errors. - // it does not call the callback function\ + // it can be called the callback function with null response. if (rpcResponse.hasErrorMessage()) { if (controller != null) { this.controller.setFailed(rpcResponse.getErrorMessage()); } callback.run(null); } else { // if rpc call succeed - try { - Message responseMessage; - if (!rpcResponse.hasResponseMessage()) { - responseMessage = null; - } else { + + Message responseMessage = null; + if (rpcResponse.hasResponseMessage()) { + + try { responseMessage = responsePrototype.newBuilderForType().mergeFrom( rpcResponse.getResponseMessage()).build(); + } catch (InvalidProtocolBufferException e) { + if (controller != null) { + this.controller.setFailed(e.getMessage()); + } } - - callback.run(responseMessage); - - } catch (InvalidProtocolBufferException e) { - throw new RemoteException(getErrorMessage(""), e); - } - } - } - } - - private String getErrorMessage(String message) { - return "Exception [" + protocol.getCanonicalName() + - "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress) - getChannel().remoteAddress()) + ")]: " + message; - } - - @ChannelHandler.Sharable - private class ClientChannelInboundHandler extends SimpleChannelInboundHandler { - - void registerCallback(int seqId, ResponseCallback callback) { - - if (requests.putIfAbsent(seqId, callback) != null) { - throw new RemoteException( - getErrorMessage("Duplicate Sequence Id "+ seqId)); - } - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception { - ResponseCallback callback = requests.remove(response.getId()); - - if (callback == null) { - LOG.warn("Dangling rpc call"); - } else { - callback.run(response); - } - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - super.channelActive(ctx); - LOG.info("Connection established successfully : " + ctx.channel().remoteAddress()); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception { - LOG.error(getRemoteAddress() + "," + protocol + "," + cause.getMessage(), cause); - - sendExceptions(cause.getMessage()); - - if(LOG.isDebugEnabled()) { - LOG.error(cause.getMessage(), cause); - } else { - LOG.error("RPC Exception:" + cause.getMessage()); - } - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent) { - IdleStateEvent e = (IdleStateEvent) evt; - /* If all requests is done and event is triggered, channel will be closed. */ - if (e.state() == IdleState.ALL_IDLE && requests.size() == 0) { - ctx.close(); - LOG.warn("Idle connection closed successfully :" + ctx.channel().remoteAddress()); } + callback.run(responseMessage); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java index e4109fe..22f47b0 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java @@ -24,6 +24,7 @@ import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.Service; import io.netty.channel.*; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.rpc.RpcProtos.RpcRequest; @@ -52,7 +53,7 @@ public class AsyncRpcServer extends NettyServerBase { Method method = serviceClass.getMethod("newReflectiveService", interfaceClass); this.service = (Service) method.invoke(null, instance); - this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance()); + this.initializer = new ProtoServerChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance()); super.init(this.initializer, workerNum); } @@ -81,53 +82,67 @@ public class AsyncRpcServer extends NettyServerBase { protected void channelRead0(final ChannelHandlerContext ctx, final RpcRequest request) throws Exception { String methodName = request.getMethodName(); - MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); + final MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); if (methodDescriptor == null) { - throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)); + exceptionCaught(ctx, new RemoteCallException(request.getId(), new NoSuchMethodException(methodName))); + return; } - Message paramProto = null; - if (request.hasRequestMessage()) { - try { + try { + Message paramProto = null; + if (request.hasRequestMessage()) { paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType() .mergeFrom(request.getRequestMessage()).build(); - } catch (Throwable t) { - throw new RemoteCallException(request.getId(), methodDescriptor, t); } - } - final RpcController controller = new NettyRpcController(); + final RpcController controller = new NettyRpcController(); - RpcCallback callback = !request.hasId() ? null : new RpcCallback() { + final RpcCallback callback = !request.hasId() ? null : new RpcCallback() { - public void run(Message returnValue) { + public void run(Message returnValue) { - RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId()); + RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId()); - if (returnValue != null) { - builder.setResponseMessage(returnValue.toByteString()); - } + if (returnValue != null) { + builder.setResponseMessage(returnValue.toByteString()); + } - if (controller.failed()) { - builder.setErrorMessage(controller.errorText()); - } + if (controller.failed()) { + builder.setErrorMessage(controller.errorText()); + } - ctx.writeAndFlush(builder.build()); - } - }; + ctx.writeAndFlush(builder.build()); + } + }; - service.callMethod(methodDescriptor, controller, paramProto, callback); + service.callMethod(methodDescriptor, controller, paramProto, callback); + } catch (RemoteCallException e) { + exceptionCaught(ctx, e); + } catch (Throwable throwable) { + exceptionCaught(ctx, new RemoteCallException(request.getId(), methodDescriptor, throwable)); + } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception{ + throws Exception { if (cause instanceof RemoteCallException) { RemoteCallException callException = (RemoteCallException) cause; ctx.writeAndFlush(callException.getResponse()); + + if(LOG.isDebugEnabled()) { + Throwable rootCause = ExceptionUtils.getRootCause(cause); + LOG.error(ExceptionUtils.getMessage(rootCause), rootCause); + } } else { - LOG.error(cause.getMessage()); + /* unhandled exception. */ + if (ctx.channel().isOpen()) { + /* client can be triggered channelInactiveEvent */ + ctx.close(); + } + Throwable rootCause = ExceptionUtils.getRootCause(cause); + LOG.fatal(ExceptionUtils.getMessage(rootCause), rootCause); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java index ad536a4..349a0a0 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java @@ -18,71 +18,68 @@ package org.apache.tajo.rpc; -import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.*; import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import io.netty.channel.*; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; -import io.netty.util.concurrent.GenericFutureListener; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import io.netty.channel.ChannelHandler; import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey; -import org.apache.tajo.rpc.RpcProtos.RpcRequest; import org.apache.tajo.rpc.RpcProtos.RpcResponse; import java.lang.reflect.Method; import java.net.InetSocketAddress; -import java.util.Map; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; -public class BlockingRpcClient extends NettyClientBase { - private static final Log LOG = LogFactory.getLog(RpcProtos.class); - - private final Map requests = - new ConcurrentHashMap(); +public class BlockingRpcClient extends NettyClientBase { private final Method stubMethod; private final ProxyRpcChannel rpcChannel; - private final ChannelInboundHandlerAdapter inboundHandler; + private final NettyChannelInboundHandler handler; - /** - * Intentionally make this method package-private, avoiding user directly - * new an instance through this constructor. - */ BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries) throws NoSuchMethodException, ClassNotFoundException { - this(rpcConnectionKey, retries, 0); + this(rpcConnectionKey, retries, 0, TimeUnit.NANOSECONDS, false); } - BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries, int idleTimeSeconds) - throws ClassNotFoundException, NoSuchMethodException { + /** + * Intentionally make this method package-private, avoiding user directly + * new an instance through this constructor. + * + * @param rpcConnectionKey + * @param retries retry operation number of times + * @param timeout disable ping, it trigger timeout event on idle-state. + * otherwise it is request timeout on active-state + * @param timeUnit TimeUnit + * @param enablePing enable to detect remote peer hangs + * @throws ClassNotFoundException + * @throws NoSuchMethodException + */ + BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries, long timeout, TimeUnit timeUnit, + boolean enablePing) throws ClassNotFoundException, NoSuchMethodException { super(rpcConnectionKey, retries); - stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class); - rpcChannel = new ProxyRpcChannel(); - inboundHandler = new ClientChannelInboundHandler(); - init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance(), idleTimeSeconds)); + + this.stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class); + this.rpcChannel = new ProxyRpcChannel(); + this.handler = new ClientChannelInboundHandler(); + init(new ProtoClientChannelInitializer(handler, + RpcResponse.getDefaultInstance(), + timeUnit.toNanos(timeout), + enablePing)); } @Override - public T getStub() { + public I getStub() { return getStub(stubMethod, rpcChannel); } @Override - public void close() { - for(ProtoCallFuture callback: requests.values()) { - callback.setFailed("BlockingRpcClient terminates all the connections", - new ServiceException("BlockingRpcClient terminates all the connections")); - } - requests.clear(); - super.close(); + protected NettyChannelInboundHandler getHandler() { + return handler; } private class ProxyRpcChannel implements BlockingRpcChannel { + private final AtomicInteger sequence = new AtomicInteger(0); + @Override public Message callBlockingMethod(final MethodDescriptor method, final RpcController controller, @@ -91,136 +88,66 @@ public class BlockingRpcClient extends NettyClientBase { throws TajoServiceException { int nextSeqId = sequence.getAndIncrement(); + RpcProtos.RpcRequest rpcRequest = buildRequest(nextSeqId, method, param); + ProtoCallFuture callFuture = new ProtoCallFuture(controller, responsePrototype); - Message rpcRequest = buildRequest(nextSeqId, method, param); - - ProtoCallFuture callFuture = - new ProtoCallFuture(controller, responsePrototype); - requests.put(nextSeqId, callFuture); - - ChannelPromise channelPromise = getChannel().newPromise(); - channelPromise.addListener(new GenericFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - inboundHandler.exceptionCaught(null, new ServiceException(future.cause())); - } - } - }); - getChannel().writeAndFlush(rpcRequest, channelPromise); + invoke(rpcRequest, callFuture, 0); try { - return callFuture.get(60, TimeUnit.SECONDS); + return callFuture.get(); } catch (Throwable t) { if (t instanceof ExecutionException) { Throwable cause = t.getCause(); if (cause != null && cause instanceof TajoServiceException) { - throw (TajoServiceException)cause; + throw (TajoServiceException) cause; } } throw new TajoServiceException(t.getMessage()); } } - - private Message buildRequest(int seqId, - MethodDescriptor method, - Message param) { - RpcRequest.Builder requestBuilder = RpcRequest.newBuilder() - .setId(seqId) - .setMethodName(method.getName()); - - if (param != null) { - requestBuilder.setRequestMessage(param.toByteString()); - } - - return requestBuilder.build(); - } - } - - private String getErrorMessage(String message) { - if(getChannel() != null) { - return protocol.getName() + - "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress) - getChannel().remoteAddress()) + "): " + message; - } else { - return "Exception " + message; - } } private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) { - if(getChannel() != null) { - return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(), - RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().remoteAddress())); + if (getChannel() != null) { + return new TajoServiceException(response.getErrorMessage(), cause, getKey().protocolClass.getName(), + RpcUtils.normalizeInetSocketAddress((InetSocketAddress) getChannel().remoteAddress())); } else { return new TajoServiceException(response.getErrorMessage()); } } @ChannelHandler.Sharable - private class ClientChannelInboundHandler extends SimpleChannelInboundHandler { + public class ClientChannelInboundHandler extends NettyChannelInboundHandler { @Override - protected void channelRead0(ChannelHandlerContext ctx, RpcResponse rpcResponse) throws Exception { - ProtoCallFuture callback = requests.remove(rpcResponse.getId()); - - if (callback == null) { - LOG.warn("Dangling rpc call"); + protected void run(RpcResponse rpcResponse, ProtoCallFuture callback) throws Exception { + if (rpcResponse.hasErrorMessage()) { + callback.setFailed(rpcResponse.getErrorMessage(), + makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace()))); } else { - if (rpcResponse.hasErrorMessage()) { - callback.setFailed(rpcResponse.getErrorMessage(), - makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace()))); - } else { - Message responseMessage; - - if (!rpcResponse.hasResponseMessage()) { - responseMessage = null; - } else { + Message responseMessage = null; + + if (rpcResponse.hasResponseMessage()) { + try { responseMessage = callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage()) .build(); + } catch (InvalidProtocolBufferException e) { + callback.setFailed(e.getMessage(), e); } - - callback.setResponse(responseMessage); } + callback.setResponse(responseMessage); } } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception { - /* Current requests will be failed */ - for(ProtoCallFuture callback: requests.values()) { - callback.setFailed(cause.getMessage(), cause); - } - requests.clear(); - - if(LOG.isDebugEnabled()) { - LOG.error("" + cause.getMessage(), cause); - } else { - LOG.error("RPC Exception:" + cause.getMessage()); - } - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - super.channelActive(ctx); - LOG.info("Connection established successfully : " + ctx.channel().remoteAddress()); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent) { - IdleStateEvent e = (IdleStateEvent) evt; - /* If all requests is done and event is triggered, channel will be closed. */ - if (e.state() == IdleState.ALL_IDLE && requests.size() == 0) { - ctx.close(); - LOG.warn("Idle connection closed successfully :" + ctx.channel().remoteAddress()); - } - } + protected void handleException(int requestId, ProtoCallFuture callback, String message) { + callback.setFailed(message + "", new TajoServiceException(message)); } } - static class ProtoCallFuture implements Future { + static class ProtoCallFuture implements Future { private Semaphore sem = new Semaphore(0); + private boolean done = false; private Message response = null; private Message returnType; @@ -240,8 +167,9 @@ public class BlockingRpcClient extends NettyClientBase { @Override public Message get() throws InterruptedException, ExecutionException { - sem.acquire(); - if(ee != null) { + if(!isDone()) sem.acquire(); + + if (ee != null) { throw ee; } return response; @@ -250,14 +178,16 @@ public class BlockingRpcClient extends NettyClientBase { @Override public Message get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - if(sem.tryAcquire(timeout, unit)) { - if (ee != null) { - throw ee; + if(!isDone()) { + if (!sem.tryAcquire(timeout, unit)) { + throw new TimeoutException(); } - return response; - } else { - throw new TimeoutException(); } + + if (ee != null) { + throw ee; + } + return response; } @Override @@ -267,19 +197,21 @@ public class BlockingRpcClient extends NettyClientBase { @Override public boolean isDone() { - return sem.availablePermits() > 0; + return done; } public void setResponse(Message response) { this.response = response; + done = true; sem.release(); } public void setFailed(String errorText, Throwable t) { - if(controller != null) { + if (controller != null) { this.controller.setFailed(errorText); } ee = new ExecutionException(errorText, t); + done = true; sem.release(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java index bb31367..93c28e3 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java @@ -23,10 +23,10 @@ import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.RpcController; import io.netty.channel.*; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.rpc.RpcProtos.RpcRequest; -import org.apache.tajo.rpc.RpcProtos.RpcResponse; import java.lang.reflect.Method; import java.net.InetSocketAddress; @@ -53,7 +53,7 @@ public class BlockingRpcServer extends NettyServerBase { "newReflectiveBlockingService", interfaceClass); this.service = (BlockingService) method.invoke(null, instance); - this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance()); + this.initializer = new ProtoServerChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance()); super.init(this.initializer, workerNum); } @@ -80,43 +80,40 @@ public class BlockingRpcServer extends NettyServerBase { } @Override - protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception { + protected void channelRead0(final ChannelHandlerContext ctx, final RpcRequest request) throws Exception { String methodName = request.getMethodName(); - MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); - + final MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); if (methodDescriptor == null) { - throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)); + exceptionCaught(ctx, new RemoteCallException(request.getId(), new NoSuchMethodException(methodName))); + return; } - Message paramProto = null; - if (request.hasRequestMessage()) { - try { + + try { + Message paramProto = null; + if (request.hasRequestMessage()) { paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType() .mergeFrom(request.getRequestMessage()).build(); - - } catch (Throwable t) { - throw new RemoteCallException(request.getId(), methodDescriptor, t); } - } - Message returnValue; - RpcController controller = new NettyRpcController(); - try { - returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto); - } catch (Throwable t) { - throw new RemoteCallException(request.getId(), methodDescriptor, t); - } + RpcController controller = new NettyRpcController(); + Message returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto); - RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId()); + RpcProtos.RpcResponse.Builder builder = RpcProtos.RpcResponse.newBuilder().setId(request.getId()); - if (returnValue != null) { - builder.setResponseMessage(returnValue.toByteString()); - } + if (returnValue != null) { + builder.setResponseMessage(returnValue.toByteString()); + } - if (controller.failed()) { - builder.setErrorMessage(controller.errorText()); + if (controller.failed()) { + builder.setErrorMessage(controller.errorText()); + } + ctx.writeAndFlush(builder.build()); + } catch (RemoteCallException e) { + exceptionCaught(ctx, e); + } catch (Throwable throwable) { + exceptionCaught(ctx, new RemoteCallException(request.getId(), methodDescriptor, throwable)); } - ctx.writeAndFlush(builder.build()); } @Override @@ -124,6 +121,19 @@ public class BlockingRpcServer extends NettyServerBase { if (cause instanceof RemoteCallException) { RemoteCallException callException = (RemoteCallException) cause; ctx.writeAndFlush(callException.getResponse()); + + if(LOG.isDebugEnabled()) { + Throwable rootCause = ExceptionUtils.getRootCause(cause); + LOG.debug(ExceptionUtils.getMessage(rootCause), rootCause); + } + } else { + /* unhandled exception. */ + if (ctx.channel().isOpen()) { + /* client can be triggered channelInactiveEvent */ + ctx.close(); + } + Throwable rootCause = ExceptionUtils.getRootCause(cause); + LOG.fatal(ExceptionUtils.getMessage(rootCause), rootCause); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java index c4c3256..dee0ff4 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java @@ -20,11 +20,9 @@ package org.apache.tajo.rpc; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; -import java.util.concurrent.Future; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; public class CallFuture implements RpcCallback, Future { @@ -66,19 +64,29 @@ public class CallFuture implements RpcCallback, Future { } @Override - public T get() throws InterruptedException { - sem.acquire(); + public T get() throws InterruptedException, ExecutionException { + if (!isDone()) + sem.acquire(); + throwIfFailed(); return response; } @Override - public T get(long timeout, TimeUnit unit) - throws InterruptedException, TimeoutException { - if (sem.tryAcquire(timeout, unit)) { - return response; - } else { - throw new TimeoutException(); + public T get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException { + if (!isDone()) { + if (!sem.tryAcquire(timeout, unit)) { + throw new TimeoutException(); + } + } + + throwIfFailed(); + return response; + } + + private void throwIfFailed() throws ExecutionException { + if (controller.failed()) { + throw new ExecutionException(new ServiceException(controller.errorText())); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ChannelEventListener.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ChannelEventListener.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ChannelEventListener.java new file mode 100644 index 0000000..e0f2c0d --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ChannelEventListener.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import io.netty.channel.ChannelHandlerContext; + +/** + * Event listener for netty code. Users can subscribe events by using this interface. + */ +public interface ChannelEventListener { + + /** + * Performs actions before channel open. + * @param ctx + */ + void channelRegistered(ChannelHandlerContext ctx); + + /** + * Performs actions after channel close + * @param ctx + */ + void channelUnregistered(ChannelHandlerContext ctx); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java deleted file mode 100644 index 29c9772..0000000 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; - -public class ConnectionCloseFutureListener implements GenericFutureListener { - private RpcClientManager.RpcConnectionKey key; - - public ConnectionCloseFutureListener(RpcClientManager.RpcConnectionKey key) { - this.key = key; - } - - @Override - public void operationComplete(Future future) throws Exception { - RpcClientManager.remove(key); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java index 4ba19a5..47125e4 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java @@ -28,7 +28,7 @@ public class DefaultRpcController implements RpcController { @Override public void reset() { - errorText = ""; + errorText = null; error = false; canceled = false; } http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorClientHandler.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorClientHandler.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorClientHandler.java new file mode 100644 index 0000000..441c78a --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorClientHandler.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.ReferenceCountUtil; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.nio.charset.Charset; + +/** + * MonitorClientHandler is a packet sender for detecting server hangs + * Triggers an {@link MonitorStateEvent} when a remote peer has not respond. + */ + +public class MonitorClientHandler extends ChannelInboundHandlerAdapter { + private static final Log LOG = LogFactory.getLog(MonitorClientHandler.class); + + private ByteBuf ping; + private boolean enableMonitor; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + // Initialize the message. + ping = ctx.alloc().buffer(RpcConstants.PING_PACKET.length()) + .writeBytes(RpcConstants.PING_PACKET.getBytes(Charset.defaultCharset())); + IdleStateHandler handler = ctx.pipeline().get(IdleStateHandler.class); + if(handler != null && handler.getWriterIdleTimeInMillis() > 0) { + enableMonitor = true; + } + super.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + ping.release(); + super.channelInactive(ctx); + } + + private boolean isPing(Object msg) { + if(msg instanceof ByteBuf){ + return ByteBufUtil.equals(ping.duplicate(), ((ByteBuf)msg).duplicate()); + } + return false; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if(enableMonitor && isPing(msg)){ + //ignore ping response + if(LOG.isDebugEnabled()) { + LOG.debug("received ping " + ctx.channel()); + } + ReferenceCountUtil.release(msg); + } else { + super.channelRead(ctx, msg); + } + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (enableMonitor && evt instanceof IdleStateEvent) { + IdleStateEvent e = (IdleStateEvent) evt; + if (e.state() == IdleState.READER_IDLE && !e.isFirst()) { + /* trigger expired event */ + LOG.info("Server has not respond " + ctx.channel()); + ctx.fireUserEventTriggered(MonitorStateEvent.MONITOR_EXPIRED_STATE_EVENT); + } else if (e.state() == IdleState.WRITER_IDLE) { + /* send ping packet to remote server */ + if(LOG.isDebugEnabled()){ + LOG.debug("sending ping request " + ctx.channel()); + } + ctx.writeAndFlush(ping.duplicate().retain()); + } + } + super.userEventTriggered(ctx, evt); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorServerHandler.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorServerHandler.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorServerHandler.java new file mode 100644 index 0000000..e4333f4 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorServerHandler.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.nio.charset.Charset; + +/** + * MonitorServerHandler is a packet receiver for detecting server hangs + * Reply response when a remote peer sent a ping packet. + */ + +public class MonitorServerHandler extends ChannelInboundHandlerAdapter { + private static final Log LOG = LogFactory.getLog(MonitorServerHandler.class); + + private ByteBuf ping; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + // Initialize the message. + ping = ctx.alloc().directBuffer(4).writeBytes(RpcConstants.PING_PACKET.getBytes(Charset.defaultCharset())); + super.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + ping.release(); + super.channelInactive(ctx); + } + + private boolean isPing(Object msg) { + if (msg instanceof ByteBuf) { + return ByteBufUtil.equals(ping.duplicate(), ((ByteBuf) msg).duplicate()); + } + + return false; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (isPing(msg)) { + /* reply to client */ + if(LOG.isDebugEnabled()){ + LOG.debug("reply to " + ctx.channel()); + } + ctx.writeAndFlush(msg); + } else { + super.channelRead(ctx, msg); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorStateEvent.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorStateEvent.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorStateEvent.java new file mode 100644 index 0000000..294be3b --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorStateEvent.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +/** + * A user event triggered by {@link org.apache.tajo.rpc.MonitorStateEvent} when a ping packet is not receive. + */ +public final class MonitorStateEvent { + /** + * An {@link Enum} that represents the monitor state of a remote peer. + */ + public enum MonitorState { + /** + * No ping was received. + */ + PING_EXPIRED + } + + public static final MonitorStateEvent MONITOR_EXPIRED_STATE_EVENT = new MonitorStateEvent(MonitorState.PING_EXPIRED); + + private final MonitorState state; + + private MonitorStateEvent(MonitorState state) { + this.state = state; + } + + /** + * Returns the monitor state. + */ + public MonitorState state() { + return state; + } +} \ No newline at end of file