Return-Path: X-Original-To: apmail-ode-commits-archive@www.apache.org Delivered-To: apmail-ode-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 34D6917CB0 for ; Fri, 6 Nov 2015 10:50:57 +0000 (UTC) Received: (qmail 84718 invoked by uid 500); 6 Nov 2015 10:50:57 -0000 Delivered-To: apmail-ode-commits-archive@ode.apache.org Received: (qmail 84656 invoked by uid 500); 6 Nov 2015 10:50:56 -0000 Mailing-List: contact commits-help@ode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ode.apache.org Delivered-To: mailing list commits@ode.apache.org Received: (qmail 84054 invoked by uid 99); 6 Nov 2015 10:50:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Nov 2015 10:50:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9DB67E0216; Fri, 6 Nov 2015 10:50:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sathwik@apache.org To: commits@ode.apache.org Date: Fri, 06 Nov 2015 10:51:20 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [25/30] ode git commit: Cluster Enabled Simple Scheduler-2 Cluster Enabled Simple Scheduler-2 Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/3f5ef53a Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/3f5ef53a Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/3f5ef53a Branch: refs/heads/ODE-563 Commit: 3f5ef53ab9f248d3f443196bd96de6507ad94148 Parents: 15f1883 Author: suba Authored: Tue Jul 21 00:21:05 2015 +0530 Committer: suba Committed: Tue Jul 21 00:21:05 2015 +0530 ---------------------------------------------------------------------- Rakefile | 2 +- .../java/org/apache/ode/axis2/ODEServer.java | 5 +- .../apache/ode/bpel/clapi/ClusterManager.java | 17 ++ .../ode/bpel/clapi/ClusterMemberListener.java | 29 +++ .../hazelcast/HazelcastClusterImpl.java | 36 ++-- .../ode/scheduler/simple/SchedulerListener.java | 27 --- .../ode/scheduler/simple/SimpleScheduler.java | 185 +++++++++---------- 7 files changed, 157 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/Rakefile ---------------------------------------------------------------------- diff --git a/Rakefile b/Rakefile index 7c0fa67..5475227 100644 --- a/Rakefile +++ b/Rakefile @@ -208,7 +208,7 @@ define "ode" do desc "ODE Clustering" define "clustering" do - compile.with projects("bpel-api","bpel-store"),HAZELCAST, COMMONS.logging + compile.with projects("bpel-api","bpel-store","scheduler-simple"),HAZELCAST, COMMONS.logging package :jar end http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java index 6803350..222fedd 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java +++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java @@ -50,7 +50,6 @@ import org.apache.ode.store.ClusterProcessStoreImpl; import org.apache.ode.store.ProcessStoreImpl; import org.apache.ode.utils.GUID; import org.apache.ode.utils.fs.TempFileManager; -import org.omg.CORBA.StringHolder; import javax.servlet.ServletConfig; import javax.servlet.ServletException; @@ -198,7 +197,7 @@ public class ODEServer { _store.loadAll(); if (_clusterManager != null) { _clusterManager.registerClusterProcessStoreMessageListener(); - _clusterManager.setScheduler(_scheduler); + _clusterManager.registerClusterMemberListener(_scheduler); } try { @@ -489,7 +488,7 @@ public class ODEServer { Class clusterImplClass = this.getClass().getClassLoader().loadClass(clusterImplName); _clusterManager = (ClusterManager) clusterImplClass.newInstance(); } catch (Exception ex) { - __log.error("Error while loading class : " +clusterImplName ,ex); + __log.error("Error while loading class : " + clusterImplName, ex); } _clusterManager.init(_configRoot); } http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java index 70d7c03..a00959a 100644 --- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java +++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java @@ -19,6 +19,7 @@ package org.apache.ode.bpel.clapi; import java.io.File; +import java.util.List; public interface ClusterManager { @@ -57,6 +58,12 @@ public interface ClusterManager { void registerClusterProcessStoreMessageListener(); /** + * Register Scheduler as ClusterMemberListener + * @param scheduler + */ + void registerClusterMemberListener(Object scheduler); + + /** * Return deployment lock for cluster */ ClusterLock getDeploymentLock(); @@ -65,4 +72,14 @@ public interface ClusterManager { * Return instance lock for cluster */ ClusterLock getInstanceLock(); + + /** + * Return active node list in the cluster + */ + List getActiveNodes(); + + /** + * Return local member's uuid in the cluster + */ + String getUuid(); } http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java new file mode 100644 index 0000000..4225f7d --- /dev/null +++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.bpel.clapi; + +public interface ClusterMemberListener { + + void memberAdded(String nodeId); + + void memberRemoved(String nodeId); + + void memberElectedAsMaster(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java index 63a889a..971df3e 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java @@ -30,7 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ode.bpel.clapi.*; -import org.apache.ode.bpel.iapi.Scheduler; +import org.apache.ode.scheduler.simple.SimpleScheduler; /** * This class implements necessary methods to build the cluster using hazelcast @@ -47,7 +47,7 @@ public class HazelcastClusterImpl implements ClusterManager { private IMap instance_lock_map; private ITopic clusterMessageTopic; private ClusterProcessStore _clusterProcessStore; - private Scheduler _scheduler; + private SimpleScheduler _scheduler; private ClusterLock _hazelcastDeploymentLock; private ClusterLock _hazelcastInstanceLock; @@ -77,7 +77,6 @@ public class HazelcastClusterImpl implements ClusterManager { nodeID = localMember.getInetSocketAddress().getHostName() +":" +localMember.getInetSocketAddress().getPort(); uuid = localMember.getUuid(); __log.info("Registering HZ localMember ID " + nodeID); - markAsMaster(); deployment_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_LOCK); instance_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_PROCESS_INSTANCE_LOCK); @@ -93,15 +92,15 @@ public class HazelcastClusterImpl implements ClusterManager { public void memberAdded(MembershipEvent membershipEvent) { String nodeId = membershipEvent.getMember().getUuid(); __log.info("Member Added " +nodeId); - if(isMaster) _simpleScheduler.memberAdded(nodeId); + _scheduler.memberAdded(nodeId); } @Override public void memberRemoved(MembershipEvent membershipEvent) { - String nodeId = membershipEvent.getMember().getUuid(); - __log.info("Member Removed " +nodeId); + String nodeId = membershipEvent.getMember().getUuid(); + __log.info("Member Removed " + nodeId); markAsMaster(); - if(isMaster) _simpleScheduler.memberRemoved(nodeId, uuid); + _scheduler.memberRemoved(nodeId); } @Override @@ -149,9 +148,9 @@ public class HazelcastClusterImpl implements ClusterManager { private void markAsMaster() { leader = _hazelcastInstance.getCluster().getMembers().iterator().next(); - if (leader.localMember()) { + if (leader.localMember() && isMaster == false) { isMaster = true; - _simpleScheduler.setIsMasterNode(true); + _scheduler.memberElectedAsMaster(); } __log.info(isMaster); } @@ -168,15 +167,16 @@ public class HazelcastClusterImpl implements ClusterManager { _clusterProcessStore = store; } - public void setScheduler(Scheduler scheduler) { - _scheduler = scheduler; - _scheduler.setClusterManager(this); - } - public void registerClusterProcessStoreMessageListener() { clusterMessageTopic.addMessageListener(new ClusterMessageListener()); } + public void registerClusterMemberListener(Object scheduler) { + _scheduler = (SimpleScheduler) scheduler; + markAsMaster(); + _scheduler.setClusterManager(this); + } + public void shutdown() { if(_hazelcastInstance != null) _hazelcastInstance.getLifecycleService().shutdown(); } @@ -189,11 +189,11 @@ public class HazelcastClusterImpl implements ClusterManager { return _hazelcastInstanceLock; } - public List getKnownNodes() { - List nodesList = new ArrayList(); + public List getActiveNodes() { + List nodeList = new ArrayList(); for(Member m : _hazelcastInstance.getCluster().getMembers()) - nodesList.add(m.getUuid()) ; - return nodesList; + nodeList.add(m.getUuid()) ; + return nodeList; } } http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java ---------------------------------------------------------------------- diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java deleted file mode 100644 index 3786912..0000000 --- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.ode.scheduler.simple; - -public interface SchedulerListener { - - void memberAdded(String nodeId); - - void memberRemoved(String nodeId,String masterId); - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ode/blob/3f5ef53a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java ---------------------------------------------------------------------- diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java index 3b6ec4d..a0dbf5a 100644 --- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java +++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java @@ -22,6 +22,7 @@ package org.apache.ode.scheduler.simple; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ode.bpel.clapi.ClusterManager; +import org.apache.ode.bpel.clapi.ClusterMemberListener; import org.apache.ode.bpel.iapi.ContextException; import org.apache.ode.bpel.iapi.Scheduler; @@ -52,7 +53,7 @@ import java.util.concurrent.atomic.AtomicLong; * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m ) * */ -public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener { +public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberListener { private static final Log __log = LogFactory.getLog(SimpleScheduler.class); private static final int DEFAULT_TRANSACTION_TIMEOUT = 60 * 1000; @@ -102,13 +103,21 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener private boolean _isClusterEnabled; + private String _masterId; + private ClusterManager _clusterManager; - /** All the nodes we know about */ - private CopyOnWriteArraySet _knownNodes = new CopyOnWriteArraySet(); + /** All the nodes which are taken from the database*/ + private CopyOnWriteArraySet _dbNodes = new CopyOnWriteArraySet(); + + /** All the stale nodes */ + private CopyOnWriteArraySet _staleNodes = new CopyOnWriteArraySet(); + + /** All the nodes when members are added to the cluster*/ + private CopyOnWriteArraySet _clusterNodes = new CopyOnWriteArraySet(); /** When we last heard from our nodes. */ - private ConcurrentHashMap _lastHeartBeat = new ConcurrentHashMap(); + //private ConcurrentHashMap _lastHeartBeat = new ConcurrentHashMap(); /** Set of outstanding jobs, i.e., jobs that have been enqueued but not dequeued or dispatched yet. Used to avoid cases where a job would be dispatched twice if the server is under high load and @@ -460,13 +469,15 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener _processedSinceLastLoadTask.clear(); _outstandingJobs.clear(); - _knownNodes.clear(); + _dbNodes.clear(); + _clusterNodes.clear(); + _staleNodes.clear(); try { execTransaction(new Callable() { public Void call() throws Exception { - _knownNodes.addAll(_db.getNodeIds()); + _dbNodes.addAll(_db.getNodeIds()); return null; } @@ -475,21 +486,21 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener __log.error("Error retrieving node list.", ex); throw new ContextException("Error retrieving node list.", ex); } + _clusterNodes.add(_nodeId); long now = System.currentTimeMillis(); // Pretend we got a heartbeat... - for (String s : _knownNodes) _lastHeartBeat.put(s, now); + //for (String s : _knownNodes) _lastHeartBeat.put(s, now); // schedule immediate job loading for now! _todo.enqueue(new LoadImmediateTask(now)); // schedule check for stale nodes, make it random so that the nodes don't overlap. - if (!_isClusterEnabled) - _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval))); + _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval))); // do the upgrade sometime (random) in the immediate interval. - enqueUpgradeJobsTask(now); + _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval))); _todo.start(); _running = true; @@ -517,16 +528,33 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener _running = false; } - public void memberAdded(final String nodeId) { - _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis()+ randomMean(_immediateInterval))); + public void memberAdded(String nodeId) { + _clusterNodes.add(nodeId); } - public void memberRemoved(final String nodeId, final String masterId) { - recoverClusterStaleNodes(nodeId, masterId); + public void memberRemoved(String nodeId) { + _staleNodes.add(nodeId); } - public void enqueUpgradeJobsTask(long now) { - _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval))); + // Do enqueue CheckStaleNodes and UpgradeJobsTask after a new master is identified. + public void memberElectedAsMaster() { + _masterId = _nodeId; + _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + randomMean(_staleInterval))); + _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + randomMean(_immediateInterval))); + _dbNodes.clear(); + try { + execTransaction(new Callable() { + + public Void call() throws Exception { + _dbNodes.addAll(_db.getNodeIds()); + return null; + } + + }); + } catch (Exception ex) { + __log.error("Error retrieving node list.", ex); + throw new ContextException("Error retrieving node list.", ex); + } } class RunJob implements Callable { @@ -701,7 +729,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener } } - public void updateHeartBeat(String nodeId) { + /*public void updateHeartBeat(String nodeId) { if (nodeId == null) return; @@ -710,7 +738,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener _lastHeartBeat.put(nodeId, System.currentTimeMillis()); _knownNodes.add(nodeId); - } + }*/ boolean doLoadImmediate() { __log.debug("LOAD IMMEDIATE started"); @@ -788,10 +816,19 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener boolean doUpgrade() { __log.debug("UPGRADE started"); - final ArrayList knownNodes = new ArrayList(_knownNodes); - // Don't forget about self. - knownNodes.add(_nodeId); - Collections.sort(knownNodes); + final ArrayList activeNodes; + + // for cluster mode + if (_isClusterEnabled && _clusterManager.getIsMaster()) { + activeNodes = (ArrayList) _clusterManager.getActiveNodes(); + } + //for standalone ODE deployments + else { + activeNodes = new ArrayList(); + activeNodes.add(_nodeId); + } + + Collections.sort(activeNodes); // We're going to try to upgrade near future jobs using the db only. // We assume that the distribution of the trailing digits in the @@ -803,9 +840,9 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener return execTransaction(new Callable() { public Boolean call() throws Exception { - int numNodes = knownNodes.size(); + int numNodes = activeNodes.size(); for (int i = 0; i < numNodes; ++i) { - String node = knownNodes.get(i); + String node = activeNodes.get(i); _db.updateAssignToNode(node, i, numNodes, maxtime); } return true; @@ -822,41 +859,6 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener } - boolean doClusterJobsUpgrade() { - __log.debug("UPGRADE started for Cluster Mode"); - final ArrayList knownNodes = _clusterManager.getKnownNodes(); - Collections.sort(knownNodes); - - // We're going to try to upgrade near future jobs using the db only. - // We assume that the distribution of the trailing digits in the - // scheduled time are uniformly distributed, and use modular division - // of the time by the number of nodes to create the node assignment. - // This can be done in a single update statement. - final long maxtime = System.currentTimeMillis() + _nearFutureInterval; - try { - return execTransaction(new Callable() { - - public Boolean call() throws Exception { - int numNodes = knownNodes.size(); - for (int i = 0; i < numNodes; ++i) { - String node = knownNodes.get(i); - _db.updateAssignToNode(node, i, numNodes, maxtime); - } - return true; - } - - }); - - } catch (Exception ex) { - __log.error("Database error upgrading jobs.", ex); - return false; - } finally { - __log.debug("UPGRADE complete"); - } - - } - - /** * Re-assign stale node's jobs to self. * @param nodeId @@ -876,10 +878,11 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener __log.debug("reassigned " + numrows + " jobs to self. "); } - // We can now forget about this node, if we see it again, it will be - // "new to us" - _knownNodes.remove(nodeId); - _lastHeartBeat.remove(nodeId); + if(_isClusterEnabled) _staleNodes.remove(nodeId); + + // If the stale node id is in _clusterNodes or _dbNodes, remove it. + _clusterNodes.remove(nodeId); + _dbNodes.remove(nodeId); // Force a load-immediate to catch anything new from the recovered node. doLoadImmediate(); @@ -900,31 +903,6 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener // return delay; // } - void recoverClusterStaleNodes(final String nodeId, final String masterId) { - if (__log.isDebugEnabled()) { - __log.debug("recovering stale nodes for Cluster Mode " + nodeId); - } - try { - int numrows = execTransaction(new Callable() { - public Integer call() throws Exception { - return _db.updateReassign(nodeId, masterId); - } - }); - - if (__log.isDebugEnabled()) { - __log.debug("reassigned " + numrows + " jobs to master node. "); - } - - // Force a load-immediate to catch anything new from the recovered node. - doLoadImmediate(); - - } catch (Exception ex) { - __log.error("Database error reassigning node.", ex); - } finally { - __log.debug("node recovery complete"); - } - } - private abstract class SchedulerTask extends Task implements Runnable { SchedulerTask(long schedDate) { super(schedDate); @@ -979,8 +957,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener boolean success = false; try { - if (_isClusterEnabled && _clusterManager.getIsMaster()) success = doClusterJobsUpgrade(); - else success = doUpgrade(); + success = doUpgrade(); } finally { long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 1000); _nextUpgrade.set(future); @@ -1003,14 +980,32 @@ public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener public void run() { _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + _staleInterval)); __log.debug("CHECK STALE NODES started"); - for (String nodeId : _knownNodes) { - Long lastSeen = _lastHeartBeat.get(nodeId); - if ((lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval) - && !_nodeId.equals(nodeId)) - { + + ArrayList knownNodes = new ArrayList(); + knownNodes.addAll(_dbNodes); + knownNodes.addAll(_clusterNodes); + + // for cluster mode + if (_isClusterEnabled && _clusterManager.getIsMaster()) { + ArrayList memberList = (ArrayList) _clusterManager.getActiveNodes(); + + //find stale nodes + knownNodes.removeAll(memberList); + if (knownNodes.size() != 0) { + for (String nodeId : knownNodes) { + _staleNodes.add(nodeId); + } + } + for (String nodeId : _staleNodes) { recoverStaleNode(nodeId); } } + // for standalone ode node + else { + for (String nodeId : knownNodes) { + if (!nodeId.equals(_nodeId)) recoverStaleNode(nodeId); + } + } } }