Return-Path: X-Original-To: apmail-ode-commits-archive@www.apache.org Delivered-To: apmail-ode-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4648217CF2 for ; Fri, 6 Nov 2015 10:55:12 +0000 (UTC) Received: (qmail 92707 invoked by uid 500); 6 Nov 2015 10:55:12 -0000 Delivered-To: apmail-ode-commits-archive@ode.apache.org Received: (qmail 92634 invoked by uid 500); 6 Nov 2015 10:55:12 -0000 Mailing-List: contact commits-help@ode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ode.apache.org Delivered-To: mailing list commits@ode.apache.org Received: (qmail 91863 invoked by uid 99); 6 Nov 2015 10:55:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Nov 2015 10:55:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A411BE0216; Fri, 6 Nov 2015 10:55:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sathwik@apache.org To: commits@ode.apache.org Date: Fri, 06 Nov 2015 10:55:38 -0000 Message-Id: In-Reply-To: <2be1c5c173c842aca88ac670dc797713@git.apache.org> References: <2be1c5c173c842aca88ac670dc797713@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [28/30] ode git commit: Tested with two nodes cluster successfully Tested with two nodes cluster successfully Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/8fe5546d Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/8fe5546d Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/8fe5546d Branch: refs/heads/master Commit: 8fe5546d6528b1b2e970971af6c077077b871561 Parents: 348ae9d Author: suba Authored: Wed Aug 5 22:39:34 2015 +0530 Committer: suba Committed: Wed Aug 5 22:39:34 2015 +0530 ---------------------------------------------------------------------- .../src/main/webapp/WEB-INF/conf/hazelcast.xml | 63 ++++++++++++++++++++ .../java/org/apache/ode/axis2/ODEServer.java | 2 + .../hazelcast/HazelcastClusterImpl.java | 18 +++--- .../hazelcast/HazelcastDeploymentLock.java | 2 +- .../hazelcast/HazelcastInstanceLock.java | 3 +- .../ode/scheduler/simple/SimpleScheduler.java | 43 ++++++------- .../scheduler/simple/SimpleSchedulerTest.java | 27 ++++----- 7 files changed, 113 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/axis2-war/src/main/webapp/WEB-INF/conf/hazelcast.xml ---------------------------------------------------------------------- diff --git a/axis2-war/src/main/webapp/WEB-INF/conf/hazelcast.xml b/axis2-war/src/main/webapp/WEB-INF/conf/hazelcast.xml new file mode 100644 index 0000000..bf1e99e --- /dev/null +++ b/axis2-war/src/main/webapp/WEB-INF/conf/hazelcast.xml @@ -0,0 +1,63 @@ + + + + + + 5701 + + 0 + + false + + + 224.2.2.3 + 54327 + + + 127.0.0.1:5701 + 127.0.0.1:5702 + + + my-access-key + my-secret-key + us-west-1 + ec2.amazonaws.com + hazelcast-sg + type + hz-nodes + + 224.2.2.3 + 54327 + + + + + 10.10.1.* + + + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java index 4860150..0a13c4a 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java +++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java @@ -201,6 +201,7 @@ public class ODEServer { _clusterManager.registerClusterMemberListener((ClusterMemberListener) _scheduler); _clusterManager.setClusterProcessStore((ClusterProcessStore) _store); _clusterManager.init(_configRoot); + ((SimpleScheduler)_scheduler).setNodeId(_clusterManager.getNodeID()); } try { @@ -483,6 +484,7 @@ public class ODEServer { } } + /** * Initialize the DAO. * http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java index 9d2a554..4c5cad5 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java @@ -57,8 +57,8 @@ public class HazelcastClusterImpl implements ClusterManager, ProcessStoreCluster private IMap instance_lock_map; private ITopic clusterDeploymentMessageTopic; private ClusterProcessStore _clusterProcessStore; - private ClusterLock _hazelcastDeploymentLock; - private ClusterLock _hazelcastInstanceLock; + private HazelcastDeploymentLock hazelcastDeploymentLock; + private HazelcastInstanceLock hazelcastInstanceLock; private ClusterDeploymentMessageListener clusterDeploymentMessageListener; private ClusterMemberShipListener clusterMemberShipListener; private List clusterMemberListenerList = null; @@ -67,8 +67,11 @@ public class HazelcastClusterImpl implements ClusterManager, ProcessStoreCluster clusterMemberShipListener = new ClusterMemberShipListener(); clusterDeploymentMessageListener = new ClusterDeploymentMessageListener(); clusterDeploymentMessageListener.registerClusterProcessStoreListener((ProcessStoreClusterListener)this); + hazelcastDeploymentLock = new HazelcastDeploymentLock(); + hazelcastInstanceLock = new HazelcastInstanceLock(); } + public void init(File configRoot) { /*First,looks for the hazelcast.config system property. If it is set, its value is used as the path. @@ -101,9 +104,8 @@ public class HazelcastClusterImpl implements ClusterManager, ProcessStoreCluster instance_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_PROCESS_INSTANCE_LOCK); clusterDeploymentMessageTopic = _hazelcastInstance.getTopic(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_TOPIC); - _hazelcastDeploymentLock = (ClusterLock) new HazelcastDeploymentLock(deployment_lock_map); - _hazelcastInstanceLock = (ClusterLock) new HazelcastInstanceLock(instance_lock_map); - + hazelcastDeploymentLock.setLockMap(deployment_lock_map); + hazelcastInstanceLock.setLockMap(instance_lock_map); markAsMaster(); } } @@ -221,7 +223,7 @@ public class HazelcastClusterImpl implements ClusterManager, ProcessStoreCluster listener.memberElectedAsMaster(nodeID); } } - __log.info(isMaster); + __log.info("Master node: " +isMaster); } public boolean isMaster() { @@ -249,11 +251,11 @@ public class HazelcastClusterImpl implements ClusterManager, ProcessStoreCluster } public ClusterLock getDeploymentLock(){ - return _hazelcastDeploymentLock; + return (ClusterLock)hazelcastDeploymentLock; } public ClusterLock getInstanceLock(){ - return _hazelcastInstanceLock; + return (ClusterLock)hazelcastInstanceLock; } public List getActiveNodes() { http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java index f36a1b4..b753305 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java @@ -31,7 +31,7 @@ public class HazelcastDeploymentLock implements ClusterLock{ private IMap _lock_map; - HazelcastDeploymentLock(IMap lock_map) { + public void setLockMap(IMap lock_map) { _lock_map = lock_map; } http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java index 1729bac..8ac11f8 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java @@ -31,8 +31,7 @@ public class HazelcastInstanceLock implements ClusterLock { private IMap _lock_map; - - HazelcastInstanceLock(IMap lock_map) { + public void setLockMap(IMap lock_map) { _lock_map = lock_map; } http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java ---------------------------------------------------------------------- diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java index 1da5571..517045d 100644 --- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java +++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java @@ -479,10 +479,10 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList // schedule immediate job loading for now! _todo.enqueue(new LoadImmediateTask(now)); - if(!_isClusterEnabled) enqueueTasksReadnodeIds(); + if(!_isClusterEnabled) enqueueTasksReadnodeIds(now); else { - if (_clusterManager.isMaster()) enqueueTasksReadnodeIds(); + if (_clusterManager.isMaster()) enqueueTasksReadnodeIds(now); } _todo.start(); @@ -521,10 +521,11 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList // Do enqueue CheckStaleNodes and UpgradeJobsTask after a new master is identified. public void memberElectedAsMaster(String masterId) { - enqueueTasksReadnodeIds(); + long now = System.currentTimeMillis(); + enqueueTasksReadnodeIds(now); } - private void enqueueTasksReadnodeIds() { + private void enqueueTasksReadnodeIds(long now) { try { execTransaction(new Callable() { @@ -544,8 +545,6 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList else _knownNodes.add(_nodeId); - long now = System.currentTimeMillis(); - // schedule check for stale nodes, make it random so that the nodes don't overlap. _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval))); @@ -815,8 +814,10 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList final ArrayList activeNodes; // for cluster mode - if (_isClusterEnabled && _clusterManager.isMaster()) { - activeNodes = (ArrayList) _clusterManager.getActiveNodes(); + if (_isClusterEnabled) { + if (_clusterManager.isMaster()) { + activeNodes = (ArrayList) _clusterManager.getActiveNodes(); + } else activeNodes = null; } //for standalone ODE deployments else { @@ -984,24 +985,26 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList ArrayList knownNodes = new ArrayList(_knownNodes); // for cluster mode - if (_isClusterEnabled && _clusterManager.isMaster()) { - ArrayList memberList = (ArrayList) _clusterManager.getActiveNodes(); - - //find stale nodes - knownNodes.removeAll(memberList); - if (knownNodes.size() != 0) { - for (String nodeId : knownNodes) { - _staleNodes.add(nodeId); + if (_isClusterEnabled) { + if (_clusterManager.isMaster()) { + ArrayList memberList = (ArrayList) _clusterManager.getActiveNodes(); + + //find stale nodes + knownNodes.removeAll(memberList); + if (knownNodes.size() != 0) { + for (String nodeId : knownNodes) { + _staleNodes.add(nodeId); + } + } + for (String nodeId : _staleNodes) { + recoverStaleNode(nodeId); } - } - for (String nodeId : _staleNodes) { - recoverStaleNode(nodeId); } } // for standalone ode node else { for (String nodeId : knownNodes) { - if (!nodeId.equals(_nodeId)) recoverStaleNode(nodeId); + if (!_nodeId.equals(nodeId)) recoverStaleNode(nodeId); } } /*for (String nodeId : _knownNodes) { http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java ---------------------------------------------------------------------- diff --git a/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java b/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java index 4c89ae9..10e86fc 100644 --- a/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java +++ b/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java @@ -19,27 +19,26 @@ package org.apache.ode.scheduler.simple; -import java.util.*; - -import javax.transaction.RollbackException; -import javax.transaction.Status; -import javax.transaction.Synchronization; -import javax.transaction.SystemException; -import javax.transaction.TransactionManager; - import junit.framework.Assert; -import junit.framework.TestCase; - import org.apache.geronimo.transaction.manager.GeronimoTransactionManager; import org.apache.ode.bpel.iapi.Scheduler; import org.apache.ode.bpel.iapi.Scheduler.JobInfo; import org.apache.ode.bpel.iapi.Scheduler.JobProcessor; import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException; -import org.apache.geronimo.transaction.manager.GeronimoTransactionManager; import org.junit.After; import org.junit.Before; import org.junit.Test; +import javax.transaction.RollbackException; +import javax.transaction.Status; +import javax.transaction.Synchronization; +import javax.transaction.SystemException; +import javax.transaction.TransactionManager; + +import java.util.ArrayList; +import java.util.Date; +import java.util.Properties; + public class SimpleSchedulerTest extends Assert implements JobProcessor { DelegateSupport _ds; @@ -210,10 +209,10 @@ public class SimpleSchedulerTest extends Assert implements JobProcessor { _scheduler.setImmediateInterval(1000); _scheduler.setStaleInterval(1000); _scheduler.start(); - for (int i = 0; i < 40; ++i) { - _scheduler.updateHeartBeat("n1"); + /*for (int i = 0; i < 40; ++i) { + _scheduler.updateHeartBeat("n1"); Thread.sleep(100); - } + }*/ _scheduler.stop(); Thread.sleep(1000);