ode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sath...@apache.org
Subject [27/30] ode git commit: Done cleanup within cluster implementation
Date Fri, 06 Nov 2015 10:51:22 GMT
Done cleanup within cluster implementation


Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/348ae9de
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/348ae9de
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/348ae9de

Branch: refs/heads/ODE-563
Commit: 348ae9deb804ac5e9a9f1bafd1ebdbf53b490e1a
Parents: 43a8df8
Author: suba <suba.11@cse.mrt.ac.lk>
Authored: Sun Jul 26 15:55:32 2015 +0530
Committer: suba <suba.11@cse.mrt.ac.lk>
Committed: Sun Jul 26 15:55:32 2015 +0530

----------------------------------------------------------------------
 .../java/org/apache/ode/axis2/ODEServer.java    |  37 ++---
 .../ode/axis2/deploy/DeploymentPoller.java      |   3 +-
 .../ode/axis2/service/DeploymentWebService.java |  40 ++---
 .../apache/ode/bpel/clapi/ClusterManager.java   |   7 +-
 .../bpel/clapi/ProcessStoreClusterEvent.java    |  11 +-
 .../bpel/clapi/ProcessStoreClusterListener.java |  24 +++
 .../ode/il/config/OdeConfigProperties.java      |  12 +-
 .../ode/store/ClusterProcessStoreImpl.java      |   5 +-
 .../hazelcast/HazelcastClusterImpl.java         | 152 ++++++++++++++-----
 .../hazelcast/HazelcastConstants.java           |   6 +-
 repositories.rb                                 |   2 +-
 .../ode/scheduler/simple/SimpleScheduler.java   |  16 +-
 12 files changed, 192 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
index b3f5d2f..4860150 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
@@ -32,6 +32,7 @@ import org.apache.ode.axis2.service.ManagementService;
 import org.apache.ode.axis2.util.ClusterUrlTransformer;
 import org.apache.ode.bpel.clapi.ClusterManager;
 import org.apache.ode.bpel.clapi.ClusterMemberListener;
+import org.apache.ode.bpel.clapi.ClusterProcessStore;
 import org.apache.ode.bpel.connector.BpelServerConnector;
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.engine.BpelServerImpl;
@@ -119,7 +120,7 @@ public class ODEServer {
     
     public Runnable txMgrCreatedCallback;
 
-    private boolean isClusteringEnabled = false;
+    private boolean clusteringEnabled = false;
 
     public void init(ServletConfig config, ConfigurationContext configContext) throws ServletException
{
         init(config.getServletContext().getRealPath("/WEB-INF"), configContext);
@@ -173,8 +174,8 @@ public class ODEServer {
             txMgrCreatedCallback.run();
         }
 
-        String clusteringState = _odeConfig.getClusteringState();
-        if (clusteringState != null && isClusteringEnabled(clusteringState)) {
+        clusteringEnabled = _odeConfig.isClusteringEnabled();
+        if (clusteringEnabled) {
             initClustering();
         } else __log.info(__msgs.msgOdeClusteringNotInitialized());
 
@@ -197,10 +198,9 @@ public class ODEServer {
 
         _store.loadAll();
         if (_clusterManager != null) {
-            _clusterManager.registerClusterProcessStoreMessageListener();
-            if (_scheduler instanceof SimpleScheduler) {
-                _clusterManager.registerClusterMemberListener((ClusterMemberListener) _scheduler);
-            }
+            _clusterManager.registerClusterMemberListener((ClusterMemberListener) _scheduler);
+            _clusterManager.setClusterProcessStore((ClusterProcessStore) _store);
+            _clusterManager.init(_configRoot);
         }
 
         try {
@@ -466,20 +466,8 @@ public class ODEServer {
         }
     }
 
-    private boolean isClusteringEnabled(String clusteringState) {
-        boolean state;
-        if (clusteringState.equals("true")) state = true;
-        else state = false;
-        setClustering(state);
-        return state;
-    }
-
-    private void  setClustering (boolean state) {
-        isClusteringEnabled = state;
-    }
-
-    public boolean getIsCluteringEnabled() {
-        return isClusteringEnabled;
+    public boolean isClusteringEnabled() {
+        return clusteringEnabled;
     }
 
     /**
@@ -493,7 +481,6 @@ public class ODEServer {
         } catch (Exception ex) {
             __log.error("Error while loading class : " + clusterImplName, ex);
         }
-        _clusterManager.init(_configRoot);
     }
 
     /**
@@ -524,15 +511,15 @@ public class ODEServer {
     }
 
     protected ProcessStoreImpl createProcessStore(EndpointReferenceContext eprContext, DataSource
ds) {
-        if (isClusteringEnabled)
+        if (clusteringEnabled)
             return new ClusterProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(),
_odeConfig, false, _clusterManager);
         else return new ProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(),
_odeConfig, false);
     }
 
     protected Scheduler createScheduler() {
         SimpleScheduler scheduler;
-        if (isClusteringEnabled) {
-            scheduler = new SimpleScheduler(_clusterManager.getUuid(), new JdbcDelegate(_db.getDataSource()),
_odeConfig.getProperties(), isClusteringEnabled);
+        if (clusteringEnabled) {
+            scheduler = new SimpleScheduler(_clusterManager.getNodeID(), new JdbcDelegate(_db.getDataSource()),
_odeConfig.getProperties(), clusteringEnabled);
             scheduler.setClusterManager(_clusterManager);
         } else
             scheduler = new SimpleScheduler(new GUID().toString(), new JdbcDelegate(_db.getDataSource()),
_odeConfig.getProperties());

http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
index baa790b..169ca4f 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
@@ -49,7 +49,6 @@ import org.apache.ode.utils.WatchDog;
 import javax.xml.namespace.QName;
 import java.io.File;
 import java.io.FileFilter;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
@@ -99,7 +98,7 @@ public class DeploymentPoller {
     public DeploymentPoller(File deployDir, final ODEServer odeServer) {
         _odeServer = odeServer;
         _deployDir = deployDir;
-        clusterEnabled = _odeServer.getIsCluteringEnabled();
+        clusterEnabled = _odeServer.isClusteringEnabled();
         if (!_deployDir.exists()) {
             boolean isDeployDirCreated = _deployDir.mkdir();
             if (!isDeployDirCreated) {

http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
index 61bf00d..01e003b 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
@@ -20,24 +20,6 @@
 package org.apache.ode.axis2.service;
 
 
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collection;
-import java.util.List;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
-
-import javax.activation.DataHandler;
-import javax.wsdl.Definition;
-import javax.wsdl.WSDLException;
-import javax.wsdl.factory.WSDLFactory;
-import javax.wsdl.xml.WSDLReader;
-import javax.xml.namespace.QName;
-
 import org.apache.axiom.om.OMAbstractFactory;
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.om.OMNamespace;
@@ -45,27 +27,37 @@ import org.apache.axiom.om.OMText;
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axiom.soap.SOAPFactory;
 import org.apache.axis2.AxisFault;
-import org.apache.ode.bpel.clapi.ClusterLock;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.description.AxisService;
-import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.axis2.engine.AxisEngine;
 import org.apache.axis2.receivers.AbstractMessageReceiver;
 import org.apache.axis2.util.Utils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.commons.lang.StringUtils;
 import org.apache.ode.axis2.ODEServer;
 import org.apache.ode.axis2.OdeFault;
 import org.apache.ode.axis2.deploy.DeploymentPoller;
 import org.apache.ode.axis2.hooks.ODEAxisService;
-import org.apache.ode.bpel.iapi.BpelServer;
+import org.apache.ode.bpel.clapi.ClusterLock;
 import org.apache.ode.bpel.iapi.ProcessConf;
 import org.apache.ode.bpel.iapi.ProcessStore;
 import org.apache.ode.il.OMUtils;
-import org.apache.ode.utils.fs.FileUtils;
 import org.apache.ode.utils.Namespaces;
+import org.apache.ode.utils.fs.FileUtils;
+
+import javax.activation.DataHandler;
+import javax.wsdl.Definition;
+import javax.wsdl.WSDLException;
+import javax.wsdl.factory.WSDLFactory;
+import javax.wsdl.xml.WSDLReader;
+import javax.xml.namespace.QName;
+import java.io.*;
+import java.util.Collection;
+import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
 
 /**
  * Axis wrapper for process deployment.
@@ -95,7 +87,7 @@ public class DeploymentWebService {
         _store = store;
         _poller = poller;
         _odeServer = odeServer;
-        clusterEnabled = _odeServer.getIsCluteringEnabled();
+        clusterEnabled = _odeServer.isClusteringEnabled();
 
         Definition def;
         WSDLReader wsdlReader = WSDLFactory.newInstance().newWSDLReader();

http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
index 07d3d8d..5a2e0f9 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
@@ -38,7 +38,7 @@ public interface ClusterManager {
      * Return whether the local member is Master or not
      * @return
      */
-    boolean getIsMaster();
+    boolean isMaster();
 
     /**
      * Set the Process Store object which uses for clustering
@@ -55,8 +55,7 @@ public interface ClusterManager {
     /**
      * Register the cluster for message listener
      */
-    void registerClusterProcessStoreMessageListener();
-
+    void registerClusterProcessStoreMessageListener(ProcessStoreClusterListener listener);
     /**
      * Register Scheduler as ClusterMemberListener
      * @param listener
@@ -81,5 +80,5 @@ public interface ClusterManager {
     /**
      * Return local member's uuid in the cluster
      */
-    String getUuid();
+    String getNodeID();
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java
b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java
index a396f6f..79d9a78 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java
@@ -27,7 +27,8 @@ public abstract class ProcessStoreClusterEvent implements Serializable {
 
     protected String info ;
 
-    private String uuid;
+    /** Unique ID of the Node in the Cluster generating the Event */
+    private String eventGeneratingNode;
 
     public ProcessStoreClusterEvent(String deploymentUnit) {
         this.deploymentUnit = deploymentUnit;
@@ -38,12 +39,12 @@ public abstract class ProcessStoreClusterEvent implements Serializable
{
         return "{ProcessStoreClusterEvent#" + deploymentUnit +"}";
     }
 
-    public void setUuid(String uuid) {
-        this.uuid = uuid;
+    public void setEventGeneratingNode(String uuid) {
+        this.eventGeneratingNode = uuid;
     }
 
-    public String getUuid() {
-        return uuid;
+    public String getEventGeneratingNode() {
+        return eventGeneratingNode;
     }
 
     public String getDuName() {

http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterListener.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterListener.java
b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterListener.java
new file mode 100644
index 0000000..26f42cf
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.bpel.clapi;
+
+public interface ProcessStoreClusterListener {
+    public void onProcessStoreClusterEvent(ProcessStoreClusterEvent message);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java
----------------------------------------------------------------------
diff --git a/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java b/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java
index 5c0ed13..5697422 100644
--- a/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java
+++ b/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java
@@ -19,6 +19,10 @@
 
 package org.apache.ode.il.config;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.utils.SystemUtils;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -26,10 +30,6 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ode.utils.SystemUtils;
-
 /**
  * Configuration object used for configuring the intergration layer. The propereties are
those likely to be common to all layers.
  *
@@ -295,8 +295,8 @@ public class OdeConfigProperties {
         return getProperty(OdeConfigProperties.PROP_DEPLOY_DIR);
     }
 
-    public String getClusteringState() {
-        return getProperty(OdeConfigProperties.PROP_CLUSTERING_ENABLED);
+    public boolean isClusteringEnabled() {
+        return Boolean.valueOf(getProperty(OdeConfigProperties.PROP_CLUSTERING_ENABLED, "false"));
     }
 
     public String getClusteringImplClass() {

http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
----------------------------------------------------------------------
diff --git a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
index 51fea5a..d701e23 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
@@ -47,7 +47,6 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl implements
Cluster
     public ClusterProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String
persistenceType, OdeConfigProperties props, boolean createDatamodel, ClusterManager clusterManager)
{
         super(eprContext,ds,persistenceType,props,createDatamodel);
         _clusterManager = clusterManager;
-        _clusterManager.setClusterProcessStore(this);
     }
 
     public Collection<QName> deploy(final File deploymentUnitDirectory) {
@@ -59,7 +58,7 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl implements
Cluster
     private void publishProcessStoreDeployedEvent(String duName){
         deployedEvent = new ProcessStoreDeployedEvent(duName);
         _clusterManager.publishProcessStoreClusterEvent(deployedEvent);
-        __log.info("Completed actual deployment for " +duName +" by " +deployedEvent.getUuid());
+        __log.info("Completed actual deployment for " +duName +" by " +deployedEvent.getEventGeneratingNode());
     }
 
     public void deployProcesses(final String duName) {
@@ -121,7 +120,7 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl implements
Cluster
     private void publishProcessStoreUndeployedEvent(String duName){
         undeployedEvent = new ProcessStoreUndeployedEvent(duName);
         _clusterManager.publishProcessStoreClusterEvent(undeployedEvent);
-        __log.info("Completed actual undeployment for " +duName +" by " +undeployedEvent.getUuid());
+        __log.info("Completed actual undeployment for " +duName +" by " +undeployedEvent.getEventGeneratingNode());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
index f68068a..9d2a554 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
@@ -18,8 +18,20 @@
  */
 package org.apache.ode.clustering.hazelcast;
 
-import com.hazelcast.core.*;
+import com.hazelcast.config.Config;
 import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.config.ListenerConfig;
+import com.hazelcast.config.TopicConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.ITopic;
+import com.hazelcast.core.Member;
+import com.hazelcast.core.MemberAttributeEvent;
+import com.hazelcast.core.MembershipEvent;
+import com.hazelcast.core.MembershipListener;
+import com.hazelcast.core.Message;
+import com.hazelcast.core.MessageListener;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -34,21 +46,28 @@ import org.apache.ode.bpel.clapi.*;
 /**
  * This class implements necessary methods to build the cluster using hazelcast
  */
-public class HazelcastClusterImpl implements ClusterManager {
+public class HazelcastClusterImpl implements ClusterManager, ProcessStoreClusterListener
{
     private static final Log __log = LogFactory.getLog(HazelcastClusterImpl.class);
 
     private HazelcastInstance _hazelcastInstance;
     private boolean isMaster = false;
+    private String nodeHostName;
     private String nodeID;
-    private String uuid;
-    private Member leader;
     private IMap<String, String> deployment_lock_map;
     private IMap<Long, Long> instance_lock_map;
-    private ITopic<ProcessStoreClusterEvent> clusterMessageTopic;
+    private ITopic<ProcessStoreClusterEvent> clusterDeploymentMessageTopic;
     private ClusterProcessStore _clusterProcessStore;
-    private ClusterMemberListener _listener;
     private ClusterLock<String> _hazelcastDeploymentLock;
     private ClusterLock<Long> _hazelcastInstanceLock;
+    private ClusterDeploymentMessageListener clusterDeploymentMessageListener;
+    private ClusterMemberShipListener clusterMemberShipListener;
+    private List<ClusterMemberListener> clusterMemberListenerList = null;
+
+    public HazelcastClusterImpl() {
+        clusterMemberShipListener = new ClusterMemberShipListener();
+        clusterDeploymentMessageListener = new ClusterDeploymentMessageListener();
+        clusterDeploymentMessageListener.registerClusterProcessStoreListener((ProcessStoreClusterListener)this);
+    }
 
     public void init(File configRoot) {
 
@@ -63,7 +82,8 @@ public class HazelcastClusterImpl implements ClusterManager {
                 __log.error("hazelcast.xml does not exist or is not a file");
             else
                 try {
-                    _hazelcastInstance = Hazelcast.newHazelcastInstance(new FileSystemXmlConfig(hzXml));
+                    Config config = loadConfig(hzXml);
+                    _hazelcastInstance = Hazelcast.newHazelcastInstance(config);
                 } catch (FileNotFoundException fnf) {
                     __log.error(fnf);
                 }
@@ -71,37 +91,71 @@ public class HazelcastClusterImpl implements ClusterManager {
 
         if (_hazelcastInstance != null) {
             // Registering this node in the cluster.
-            _hazelcastInstance.getCluster().addMembershipListener(new ClusterMemberShipListener());
+            //_hazelcastInstance.getCluster().addMembershipListener(new ClusterMemberShipListener());
             Member localMember = _hazelcastInstance.getCluster().getLocalMember();
-            nodeID = localMember.getInetSocketAddress().getHostName() +":" +localMember.getInetSocketAddress().getPort();
-            uuid = localMember.getUuid();
-            __log.info("Registering HZ localMember ID " + nodeID);
-
-            markAsMaster();
+            nodeHostName = localMember.getSocketAddress().getHostName() + ":" + localMember.getSocketAddress().getPort();
+            nodeID = localMember.getUuid();
+            __log.info("Registering HZ localMember:" + nodeHostName);
 
             deployment_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_LOCK);
             instance_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_PROCESS_INSTANCE_LOCK);
-            clusterMessageTopic = _hazelcastInstance.getTopic(HazelcastConstants.ODE_CLUSTER_MSG);
+            clusterDeploymentMessageTopic = _hazelcastInstance.getTopic(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_TOPIC);
 
             _hazelcastDeploymentLock = (ClusterLock) new HazelcastDeploymentLock(deployment_lock_map);
             _hazelcastInstanceLock = (ClusterLock) new HazelcastInstanceLock(instance_lock_map);
+
+            markAsMaster();
         }
     }
 
+    protected Config loadConfig(File hazelcastConfigFile) throws FileNotFoundException {
+        Config config = new FileSystemXmlConfig(hazelcastConfigFile);
+
+        //add Cluster membership listener
+        ListenerConfig clusterMemberShipListenerConfig = new ListenerConfig();
+        clusterMemberShipListenerConfig.setImplementation(clusterMemberShipListener);
+        config.addListenerConfig(clusterMemberShipListenerConfig);
+
+        //set topic message listener
+        ListenerConfig topicListenerConfig = new ListenerConfig();
+        topicListenerConfig.setImplementation(clusterDeploymentMessageListener);
+        TopicConfig topicConfig = config.getTopicConfig(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_TOPIC);
+        topicConfig.addMessageListenerConfig(topicListenerConfig);
+
+        return config;
+    }
+
     class ClusterMemberShipListener implements MembershipListener {
+
+        public ClusterMemberShipListener() {
+            clusterMemberListenerList = new ArrayList<ClusterMemberListener>();
+        }
+
+        public void registerClusterMemberListener(ClusterMemberListener listener) {
+            clusterMemberListenerList.add(listener);
+        }
+
         @Override
         public void memberAdded(MembershipEvent membershipEvent) {
-            String nodeId =  membershipEvent.getMember().getUuid();
-            __log.info("Member Added " +nodeId);
-            if(isMaster && _listener != null) _listener.memberAdded(nodeId);
+            String eventNodeID = membershipEvent.getMember().getUuid();
+            __log.info("Member Added " + eventNodeID);
+            if (isMaster) {
+                for (ClusterMemberListener listener : clusterMemberListenerList) {
+                    listener.memberAdded(eventNodeID);
+                }
+            }
         }
 
         @Override
         public void memberRemoved(MembershipEvent membershipEvent) {
-            String nodeId = membershipEvent.getMember().getUuid();
-            __log.info("Member Removed " + nodeId);
+            String eventNodeID = membershipEvent.getMember().getUuid();
+            __log.info("Member Removed " + eventNodeID);
             markAsMaster();
-            if(isMaster && _listener != null) _listener.memberRemoved(nodeId);
+            if (isMaster) {
+                for (ClusterMemberListener listener : clusterMemberListenerList) {
+                    listener.memberRemoved(eventNodeID);
+                }
+            }
         }
 
         @Override
@@ -111,36 +165,48 @@ public class HazelcastClusterImpl implements ClusterManager {
     }
 
     public void publishProcessStoreClusterEvent(ProcessStoreClusterEvent clusterEvent) {
-        clusterEvent.setUuid(uuid);
-        __log.info("Send " +clusterEvent.getInfo() +" Cluster Message " +"for " +clusterEvent.getDuName()
+" [" +nodeID +"]");
-        clusterMessageTopic.publish(clusterEvent);
+        clusterEvent.setEventGeneratingNode(nodeID);
+        __log.info("Send " + clusterEvent.getInfo() + " Cluster Message " + "for " + clusterEvent.getDuName()
+ " [" + nodeHostName + "]");
+        clusterDeploymentMessageTopic.publish(clusterEvent);
     }
 
 
-    class ClusterMessageListener implements MessageListener<ProcessStoreClusterEvent>
{
+    class ClusterDeploymentMessageListener implements MessageListener<ProcessStoreClusterEvent>
{
+        List<ProcessStoreClusterListener> clusterProcessStoreListenerList = null;
+
+        public ClusterDeploymentMessageListener() {
+            clusterProcessStoreListenerList = new ArrayList<ProcessStoreClusterListener>();
+        }
+
+        public void registerClusterProcessStoreListener(ProcessStoreClusterListener listener)
{
+            clusterProcessStoreListenerList.add(listener);
+        }
+
         @Override
         public void onMessage(Message<ProcessStoreClusterEvent> msg) {
-            handleEvent(msg.getMessageObject());
+            for (ProcessStoreClusterListener listener : clusterProcessStoreListenerList)
{
+                listener.onProcessStoreClusterEvent(msg.getMessageObject());
+            }
         }
     }
 
-    private void handleEvent(ProcessStoreClusterEvent message) {
+    public void onProcessStoreClusterEvent(ProcessStoreClusterEvent message) {
         if (message instanceof ProcessStoreDeployedEvent) {
             ProcessStoreDeployedEvent event = (ProcessStoreDeployedEvent) message;
-            String eventUuid =  event.getUuid();
-            if (!uuid.equals(eventUuid)) {
+            String eventUuid = event.getEventGeneratingNode();
+            if (!nodeID.equals(eventUuid)) {
                 String duName = event.getDuName();
-                __log.info("Receive " +event.getInfo() +" Cluster Message " +"for " +event.getDuName()
+" [" +nodeID +"]");
+                __log.info("Receive " + event.getInfo() + " Cluster Message " + "for " +
event.getDuName() + " [" + nodeHostName + "]");
                 _clusterProcessStore.deployProcesses(duName);
             }
         }
 
         else if (message instanceof ProcessStoreUndeployedEvent) {
             ProcessStoreUndeployedEvent event = (ProcessStoreUndeployedEvent) message;
-            String eventUuid =  event.getUuid();
-            if (!uuid.equals(eventUuid)) {
+            String eventUuid = event.getEventGeneratingNode();
+            if (!nodeID.equals(eventUuid)) {
                 String duName = event.getDuName();
-                __log.info("Receive " +event.getInfo() +"  Cluster Message " +"for " +event.getDuName()
+" [" +nodeID +"]");
+                __log.info("Receive " + event.getInfo() + "  Cluster Message " + "for " +
event.getDuName() + " [" + nodeHostName + "]");
                 _clusterProcessStore.undeployProcesses(duName);
             }
         }
@@ -148,36 +214,38 @@ public class HazelcastClusterImpl implements ClusterManager {
     }
 
     private void markAsMaster() {
-        leader = _hazelcastInstance.getCluster().getMembers().iterator().next();
-        if (leader.localMember() && isMaster == false) {
+        Member member = _hazelcastInstance.getCluster().getMembers().iterator().next();
+        if (member.localMember() && isMaster == false) {
             isMaster = true;
-            if(_listener != null) _listener.memberElectedAsMaster(uuid);
+            for (ClusterMemberListener listener : clusterMemberListenerList) {
+                listener.memberElectedAsMaster(nodeID);
+            }
         }
         __log.info(isMaster);
     }
 
-    public boolean getIsMaster() {
+    public boolean isMaster() {
         return isMaster;
     }
 
-    public String getUuid() {
-        return uuid;
+    public String getNodeID() {
+        return nodeID;
     }
 
     public void setClusterProcessStore(ClusterProcessStore store) {
         _clusterProcessStore = store;
     }
 
-    public void registerClusterProcessStoreMessageListener() {
-        clusterMessageTopic.addMessageListener(new ClusterMessageListener());
+    public void registerClusterProcessStoreMessageListener(ProcessStoreClusterListener listener)
{
+        clusterDeploymentMessageListener.registerClusterProcessStoreListener(listener);
     }
 
     public void registerClusterMemberListener(ClusterMemberListener listener) {
-        _listener = listener;
+        clusterMemberShipListener.registerClusterMemberListener(listener);
     }
 
     public void shutdown() {
-        if(_hazelcastInstance != null) _hazelcastInstance.getLifecycleService().shutdown();
+        if (_hazelcastInstance != null) _hazelcastInstance.shutdown();
     }
 
     public ClusterLock<String> getDeploymentLock(){

http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
index 76e7341..aa787e9 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
@@ -21,9 +21,9 @@ package org.apache.ode.clustering.hazelcast;
  * Constants used in Hazelcast based clustering implementation
  */
 public final class HazelcastConstants {
-    public static final String ODE_CLUSTER_DEPLOYMENT_LOCK = "DEPLOYMENT_LOCK";
-    public static final String ODE_CLUSTER_PROCESS_INSTANCE_LOCK  = "PROCESS_INSTANCE_LOCK
";
-    public static final String ODE_CLUSTER_MSG = "CLUSTER_MSG";
+    public static final String ODE_CLUSTER_DEPLOYMENT_LOCK = "ODE_DEPLOYMENT_LOCK";
+    public static final String ODE_CLUSTER_PROCESS_INSTANCE_LOCK  = "ODE_PROCESS_INSTANCE_LOCK
";
+    public static final String ODE_CLUSTER_DEPLOYMENT_TOPIC = "ODE_DEPLOYMENT_TOPIC";
 
     private HazelcastConstants() {
     }

http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/repositories.rb
----------------------------------------------------------------------
diff --git a/repositories.rb b/repositories.rb
index f6a40fa..e23cde1 100644
--- a/repositories.rb
+++ b/repositories.rb
@@ -15,5 +15,5 @@
 
 repositories.remote << "http://repo1.maven.org/maven2"
 repositories.remote << "http://people.apache.org/~vanto/m2/"
-repositories.remote << "https://repository.apache.org/content/groups/snapshots"
+repositories.remote << "http://repository.apache.org/content/groups/snapshots"
 repositories.release_to[:url] ||= "sftp://guest@localhost/home/guest"

http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
----------------------------------------------------------------------
diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
index df33ae0..1da5571 100644
--- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
+++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
@@ -482,7 +482,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
         if(!_isClusterEnabled) enqueueTasksReadnodeIds();
 
         else {
-            if (_clusterManager.getIsMaster()) enqueueTasksReadnodeIds();
+            if (_clusterManager.isMaster()) enqueueTasksReadnodeIds();
         }
 
         _todo.start();
@@ -725,16 +725,16 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
         }
     }
 
-    /*public void updateHeartBeat(String nodeId) {
+    public void updateHeartBeat(String nodeId) {
         if (nodeId == null)
             return;
 
-        if (_nodeId.equals(nodeId))
-            return;
+        /*if (_nodeId.equals(nodeId))
+            return;*/
 
-        _lastHeartBeat.put(nodeId, System.currentTimeMillis());
+        //_lastHeartBeat.put(nodeId, System.currentTimeMillis());
         _knownNodes.add(nodeId);
-    }*/
+    }
 
     boolean doLoadImmediate() {
         __log.debug("LOAD IMMEDIATE started");
@@ -815,7 +815,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
         final ArrayList<String> activeNodes;
 
         // for cluster mode
-        if (_isClusterEnabled && _clusterManager.getIsMaster()) {
+        if (_isClusterEnabled && _clusterManager.isMaster()) {
             activeNodes = (ArrayList) _clusterManager.getActiveNodes();
         }
         //for standalone ODE deployments
@@ -984,7 +984,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
             ArrayList<String> knownNodes = new ArrayList<String>(_knownNodes);
 
             // for cluster mode
-            if (_isClusterEnabled && _clusterManager.getIsMaster()) {
+            if (_isClusterEnabled && _clusterManager.isMaster()) {
                 ArrayList<String> memberList = (ArrayList) _clusterManager.getActiveNodes();
 
                 //find stale nodes


Mime
View raw message