Return-Path: X-Original-To: apmail-ode-commits-archive@www.apache.org Delivered-To: apmail-ode-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3E7B617CB2 for ; Fri, 6 Nov 2015 10:50:57 +0000 (UTC) Received: (qmail 84800 invoked by uid 500); 6 Nov 2015 10:50:57 -0000 Delivered-To: apmail-ode-commits-archive@ode.apache.org Received: (qmail 84726 invoked by uid 500); 6 Nov 2015 10:50:57 -0000 Mailing-List: contact commits-help@ode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ode.apache.org Delivered-To: mailing list commits@ode.apache.org Received: (qmail 84081 invoked by uid 99); 6 Nov 2015 10:50:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Nov 2015 10:50:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A159EDFBC9; Fri, 6 Nov 2015 10:50:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sathwik@apache.org To: commits@ode.apache.org Date: Fri, 06 Nov 2015 10:51:21 -0000 Message-Id: <6f7d6ccc216c42dc904be467717e58f9@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [26/30] ode git commit: Cluster Enabled Simple Scheduler-3 Cluster Enabled Simple Scheduler-3 Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/43a8df89 Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/43a8df89 Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/43a8df89 Branch: refs/heads/ODE-563 Commit: 43a8df89b9b05c5831cc5fd4d385b018094f7429 Parents: 3f5ef53 Author: suba Authored: Thu Jul 23 15:52:25 2015 +0530 Committer: suba Committed: Thu Jul 23 15:52:25 2015 +0530 ---------------------------------------------------------------------- Rakefile | 2 +- .../java/org/apache/ode/axis2/ODEServer.java | 15 ++-- .../apache/ode/bpel/clapi/ClusterManager.java | 4 +- .../ode/bpel/clapi/ClusterMemberListener.java | 2 +- .../org/apache/ode/test/BPELTestAbstract.java | 2 +- .../hazelcast/HazelcastClusterImpl.java | 17 ++-- .../java/org/apache/ode/jbi/OdeLifeCycle.java | 2 +- .../ode/scheduler/simple/SimpleScheduler.java | 88 +++++++++++--------- 8 files changed, 71 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/Rakefile ---------------------------------------------------------------------- diff --git a/Rakefile b/Rakefile index 5475227..7c0fa67 100644 --- a/Rakefile +++ b/Rakefile @@ -208,7 +208,7 @@ define "ode" do desc "ODE Clustering" define "clustering" do - compile.with projects("bpel-api","bpel-store","scheduler-simple"),HAZELCAST, COMMONS.logging + compile.with projects("bpel-api","bpel-store"),HAZELCAST, COMMONS.logging package :jar end http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java index 222fedd..b3f5d2f 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java +++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java @@ -31,6 +31,7 @@ import org.apache.ode.axis2.service.DeploymentWebService; import org.apache.ode.axis2.service.ManagementService; import org.apache.ode.axis2.util.ClusterUrlTransformer; import org.apache.ode.bpel.clapi.ClusterManager; +import org.apache.ode.bpel.clapi.ClusterMemberListener; import org.apache.ode.bpel.connector.BpelServerConnector; import org.apache.ode.bpel.dao.BpelDAOConnectionFactory; import org.apache.ode.bpel.engine.BpelServerImpl; @@ -197,7 +198,9 @@ public class ODEServer { _store.loadAll(); if (_clusterManager != null) { _clusterManager.registerClusterProcessStoreMessageListener(); - _clusterManager.registerClusterMemberListener(_scheduler); + if (_scheduler instanceof SimpleScheduler) { + _clusterManager.registerClusterMemberListener((ClusterMemberListener) _scheduler); + } } try { @@ -527,10 +530,12 @@ public class ODEServer { } protected Scheduler createScheduler() { - String nodeId; - if (isClusteringEnabled) nodeId = _clusterManager.getUuid(); - else nodeId = new GUID().toString(); - SimpleScheduler scheduler = new SimpleScheduler(nodeId, new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties(), isClusteringEnabled); + SimpleScheduler scheduler; + if (isClusteringEnabled) { + scheduler = new SimpleScheduler(_clusterManager.getUuid(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties(), isClusteringEnabled); + scheduler.setClusterManager(_clusterManager); + } else + scheduler = new SimpleScheduler(new GUID().toString(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties()); scheduler.setExecutorService(_executorService); scheduler.setTransactionManager(_txMgr); return scheduler; http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java index a00959a..07d3d8d 100644 --- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java +++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java @@ -59,9 +59,9 @@ public interface ClusterManager { /** * Register Scheduler as ClusterMemberListener - * @param scheduler + * @param listener */ - void registerClusterMemberListener(Object scheduler); + void registerClusterMemberListener(ClusterMemberListener listener); /** * Return deployment lock for cluster http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java index 4225f7d..541ab9c 100644 --- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java +++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java @@ -24,6 +24,6 @@ public interface ClusterMemberListener { void memberRemoved(String nodeId); - void memberElectedAsMaster(); + void memberElectedAsMaster(String masterId); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java ---------------------------------------------------------------------- diff --git a/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java b/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java index cdda50e..00bdf7d 100644 --- a/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java +++ b/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java @@ -128,7 +128,7 @@ public abstract class BPELTestAbstract { { JdbcDelegate del = new JdbcDelegate(_dataSource); - scheduler = new SimpleScheduler("node", del, props,false); + scheduler = new SimpleScheduler("node", del, props); scheduler.setTransactionManager(_txManager); _cf = new BpelDAOConnectionFactoryImpl(scheduler); _server.setDaoConnectionFactory(_cf); http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java index 971df3e..f68068a 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java @@ -30,7 +30,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ode.bpel.clapi.*; -import org.apache.ode.scheduler.simple.SimpleScheduler; /** * This class implements necessary methods to build the cluster using hazelcast @@ -47,7 +46,7 @@ public class HazelcastClusterImpl implements ClusterManager { private IMap instance_lock_map; private ITopic clusterMessageTopic; private ClusterProcessStore _clusterProcessStore; - private SimpleScheduler _scheduler; + private ClusterMemberListener _listener; private ClusterLock _hazelcastDeploymentLock; private ClusterLock _hazelcastInstanceLock; @@ -78,6 +77,8 @@ public class HazelcastClusterImpl implements ClusterManager { uuid = localMember.getUuid(); __log.info("Registering HZ localMember ID " + nodeID); + markAsMaster(); + deployment_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_LOCK); instance_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_PROCESS_INSTANCE_LOCK); clusterMessageTopic = _hazelcastInstance.getTopic(HazelcastConstants.ODE_CLUSTER_MSG); @@ -92,7 +93,7 @@ public class HazelcastClusterImpl implements ClusterManager { public void memberAdded(MembershipEvent membershipEvent) { String nodeId = membershipEvent.getMember().getUuid(); __log.info("Member Added " +nodeId); - _scheduler.memberAdded(nodeId); + if(isMaster && _listener != null) _listener.memberAdded(nodeId); } @Override @@ -100,7 +101,7 @@ public class HazelcastClusterImpl implements ClusterManager { String nodeId = membershipEvent.getMember().getUuid(); __log.info("Member Removed " + nodeId); markAsMaster(); - _scheduler.memberRemoved(nodeId); + if(isMaster && _listener != null) _listener.memberRemoved(nodeId); } @Override @@ -150,7 +151,7 @@ public class HazelcastClusterImpl implements ClusterManager { leader = _hazelcastInstance.getCluster().getMembers().iterator().next(); if (leader.localMember() && isMaster == false) { isMaster = true; - _scheduler.memberElectedAsMaster(); + if(_listener != null) _listener.memberElectedAsMaster(uuid); } __log.info(isMaster); } @@ -171,10 +172,8 @@ public class HazelcastClusterImpl implements ClusterManager { clusterMessageTopic.addMessageListener(new ClusterMessageListener()); } - public void registerClusterMemberListener(Object scheduler) { - _scheduler = (SimpleScheduler) scheduler; - markAsMaster(); - _scheduler.setClusterManager(this); + public void registerClusterMemberListener(ClusterMemberListener listener) { + _listener = listener; } public void shutdown() { http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java ---------------------------------------------------------------------- diff --git a/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java b/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java index 0c1b296..c885d13 100644 --- a/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java +++ b/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java @@ -242,7 +242,7 @@ public class OdeLifeCycle implements ComponentLifeCycle { _ode._executorService = Executors.newCachedThreadPool(); else _ode._executorService = Executors.newFixedThreadPool(_ode._config.getThreadPoolMaxSize()); - _ode._scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_ode._dataSource), _ode._config.getProperties(),false); + _ode._scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_ode._dataSource), _ode._config.getProperties()); _ode._scheduler.setJobProcessor(_ode._server); _ode._scheduler.setExecutorService(_ode._executorService); _ode._scheduler.setTransactionManager((TransactionManager) _ode.getContext().getTransactionManager()); http://git-wip-us.apache.org/repos/asf/ode/blob/43a8df89/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java ---------------------------------------------------------------------- diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java index a0dbf5a..df33ae0 100644 --- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java +++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java @@ -103,19 +103,14 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList private boolean _isClusterEnabled; - private String _masterId; - private ClusterManager _clusterManager; - /** All the nodes which are taken from the database*/ - private CopyOnWriteArraySet _dbNodes = new CopyOnWriteArraySet(); + /** All the nodes we know about */ + private CopyOnWriteArraySet _knownNodes = new CopyOnWriteArraySet(); /** All the stale nodes */ private CopyOnWriteArraySet _staleNodes = new CopyOnWriteArraySet(); - /** All the nodes when members are added to the cluster*/ - private CopyOnWriteArraySet _clusterNodes = new CopyOnWriteArraySet(); - /** When we last heard from our nodes. */ //private ConcurrentHashMap _lastHeartBeat = new ConcurrentHashMap(); @@ -146,6 +141,10 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList private DateFormat debugDateFormatter = new SimpleDateFormat("HH:mm:ss,SSS"); + public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf) { + this(nodeId,del,conf,false); + } + public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf, boolean clusterState) { _nodeId = nodeId; _db = del; @@ -469,25 +468,9 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList _processedSinceLastLoadTask.clear(); _outstandingJobs.clear(); - _dbNodes.clear(); - _clusterNodes.clear(); + _knownNodes.clear(); _staleNodes.clear(); - try { - execTransaction(new Callable() { - - public Void call() throws Exception { - _dbNodes.addAll(_db.getNodeIds()); - return null; - } - - }); - } catch (Exception ex) { - __log.error("Error retrieving node list.", ex); - throw new ContextException("Error retrieving node list.", ex); - } - _clusterNodes.add(_nodeId); - long now = System.currentTimeMillis(); // Pretend we got a heartbeat... @@ -496,11 +479,11 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList // schedule immediate job loading for now! _todo.enqueue(new LoadImmediateTask(now)); - // schedule check for stale nodes, make it random so that the nodes don't overlap. - _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval))); + if(!_isClusterEnabled) enqueueTasksReadnodeIds(); - // do the upgrade sometime (random) in the immediate interval. - _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval))); + else { + if (_clusterManager.getIsMaster()) enqueueTasksReadnodeIds(); + } _todo.start(); _running = true; @@ -529,7 +512,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList } public void memberAdded(String nodeId) { - _clusterNodes.add(nodeId); + _knownNodes.add(nodeId); } public void memberRemoved(String nodeId) { @@ -537,16 +520,16 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList } // Do enqueue CheckStaleNodes and UpgradeJobsTask after a new master is identified. - public void memberElectedAsMaster() { - _masterId = _nodeId; - _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + randomMean(_staleInterval))); - _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + randomMean(_immediateInterval))); - _dbNodes.clear(); + public void memberElectedAsMaster(String masterId) { + enqueueTasksReadnodeIds(); + } + + private void enqueueTasksReadnodeIds() { try { execTransaction(new Callable() { public Void call() throws Exception { - _dbNodes.addAll(_db.getNodeIds()); + _knownNodes.addAll(_db.getNodeIds()); return null; } @@ -555,6 +538,19 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList __log.error("Error retrieving node list.", ex); throw new ContextException("Error retrieving node list.", ex); } + + //make double sure all the active nodes are included into _knownNodes + if(_isClusterEnabled) _knownNodes.addAll(_clusterManager.getActiveNodes()); + + else _knownNodes.add(_nodeId); + + long now = System.currentTimeMillis(); + + // schedule check for stale nodes, make it random so that the nodes don't overlap. + _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval))); + + // do the upgrade sometime (random) in the immediate interval. + _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval))); } class RunJob implements Callable { @@ -880,9 +876,13 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList if(_isClusterEnabled) _staleNodes.remove(nodeId); - // If the stale node id is in _clusterNodes or _dbNodes, remove it. - _clusterNodes.remove(nodeId); - _dbNodes.remove(nodeId); + // If the stale node id is in _knownNodes, remove it. + _knownNodes.remove(nodeId); + + // We can now forget about this node, if we see it again, it will be + // "new to us" + //_knownNodes.remove(nodeId); + //_lastHeartBeat.remove(nodeId); // Force a load-immediate to catch anything new from the recovered node. doLoadImmediate(); @@ -981,9 +981,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + _staleInterval)); __log.debug("CHECK STALE NODES started"); - ArrayList knownNodes = new ArrayList(); - knownNodes.addAll(_dbNodes); - knownNodes.addAll(_clusterNodes); + ArrayList knownNodes = new ArrayList(_knownNodes); // for cluster mode if (_isClusterEnabled && _clusterManager.getIsMaster()) { @@ -1006,6 +1004,14 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList if (!nodeId.equals(_nodeId)) recoverStaleNode(nodeId); } } + /*for (String nodeId : _knownNodes) { + Long lastSeen = _lastHeartBeat.get(nodeId); + if ((lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval) + && !_nodeId.equals(nodeId)) + { + recoverStaleNode(nodeId); + } + }*/ } }