Return-Path: X-Original-To: apmail-ode-commits-archive@www.apache.org Delivered-To: apmail-ode-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4483817CB4 for ; Fri, 6 Nov 2015 10:50:57 +0000 (UTC) Received: (qmail 84875 invoked by uid 500); 6 Nov 2015 10:50:57 -0000 Delivered-To: apmail-ode-commits-archive@ode.apache.org Received: (qmail 84795 invoked by uid 500); 6 Nov 2015 10:50:57 -0000 Mailing-List: contact commits-help@ode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ode.apache.org Delivered-To: mailing list commits@ode.apache.org Received: (qmail 84102 invoked by uid 99); 6 Nov 2015 10:50:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Nov 2015 10:50:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A640CDF9F1; Fri, 6 Nov 2015 10:50:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sathwik@apache.org To: commits@ode.apache.org Date: Fri, 06 Nov 2015 10:51:22 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [27/30] ode git commit: Done cleanup within cluster implementation Done cleanup within cluster implementation Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/348ae9de Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/348ae9de Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/348ae9de Branch: refs/heads/ODE-563 Commit: 348ae9deb804ac5e9a9f1bafd1ebdbf53b490e1a Parents: 43a8df8 Author: suba Authored: Sun Jul 26 15:55:32 2015 +0530 Committer: suba Committed: Sun Jul 26 15:55:32 2015 +0530 ---------------------------------------------------------------------- .../java/org/apache/ode/axis2/ODEServer.java | 37 ++--- .../ode/axis2/deploy/DeploymentPoller.java | 3 +- .../ode/axis2/service/DeploymentWebService.java | 40 ++--- .../apache/ode/bpel/clapi/ClusterManager.java | 7 +- .../bpel/clapi/ProcessStoreClusterEvent.java | 11 +- .../bpel/clapi/ProcessStoreClusterListener.java | 24 +++ .../ode/il/config/OdeConfigProperties.java | 12 +- .../ode/store/ClusterProcessStoreImpl.java | 5 +- .../hazelcast/HazelcastClusterImpl.java | 152 ++++++++++++++----- .../hazelcast/HazelcastConstants.java | 6 +- repositories.rb | 2 +- .../ode/scheduler/simple/SimpleScheduler.java | 16 +- 12 files changed, 192 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java index b3f5d2f..4860150 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java +++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java @@ -32,6 +32,7 @@ import org.apache.ode.axis2.service.ManagementService; import org.apache.ode.axis2.util.ClusterUrlTransformer; import org.apache.ode.bpel.clapi.ClusterManager; import org.apache.ode.bpel.clapi.ClusterMemberListener; +import org.apache.ode.bpel.clapi.ClusterProcessStore; import org.apache.ode.bpel.connector.BpelServerConnector; import org.apache.ode.bpel.dao.BpelDAOConnectionFactory; import org.apache.ode.bpel.engine.BpelServerImpl; @@ -119,7 +120,7 @@ public class ODEServer { public Runnable txMgrCreatedCallback; - private boolean isClusteringEnabled = false; + private boolean clusteringEnabled = false; public void init(ServletConfig config, ConfigurationContext configContext) throws ServletException { init(config.getServletContext().getRealPath("/WEB-INF"), configContext); @@ -173,8 +174,8 @@ public class ODEServer { txMgrCreatedCallback.run(); } - String clusteringState = _odeConfig.getClusteringState(); - if (clusteringState != null && isClusteringEnabled(clusteringState)) { + clusteringEnabled = _odeConfig.isClusteringEnabled(); + if (clusteringEnabled) { initClustering(); } else __log.info(__msgs.msgOdeClusteringNotInitialized()); @@ -197,10 +198,9 @@ public class ODEServer { _store.loadAll(); if (_clusterManager != null) { - _clusterManager.registerClusterProcessStoreMessageListener(); - if (_scheduler instanceof SimpleScheduler) { - _clusterManager.registerClusterMemberListener((ClusterMemberListener) _scheduler); - } + _clusterManager.registerClusterMemberListener((ClusterMemberListener) _scheduler); + _clusterManager.setClusterProcessStore((ClusterProcessStore) _store); + _clusterManager.init(_configRoot); } try { @@ -466,20 +466,8 @@ public class ODEServer { } } - private boolean isClusteringEnabled(String clusteringState) { - boolean state; - if (clusteringState.equals("true")) state = true; - else state = false; - setClustering(state); - return state; - } - - private void setClustering (boolean state) { - isClusteringEnabled = state; - } - - public boolean getIsCluteringEnabled() { - return isClusteringEnabled; + public boolean isClusteringEnabled() { + return clusteringEnabled; } /** @@ -493,7 +481,6 @@ public class ODEServer { } catch (Exception ex) { __log.error("Error while loading class : " + clusterImplName, ex); } - _clusterManager.init(_configRoot); } /** @@ -524,15 +511,15 @@ public class ODEServer { } protected ProcessStoreImpl createProcessStore(EndpointReferenceContext eprContext, DataSource ds) { - if (isClusteringEnabled) + if (clusteringEnabled) return new ClusterProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(), _odeConfig, false, _clusterManager); else return new ProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(), _odeConfig, false); } protected Scheduler createScheduler() { SimpleScheduler scheduler; - if (isClusteringEnabled) { - scheduler = new SimpleScheduler(_clusterManager.getUuid(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties(), isClusteringEnabled); + if (clusteringEnabled) { + scheduler = new SimpleScheduler(_clusterManager.getNodeID(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties(), clusteringEnabled); scheduler.setClusterManager(_clusterManager); } else scheduler = new SimpleScheduler(new GUID().toString(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties()); http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java index baa790b..169ca4f 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java +++ b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java @@ -49,7 +49,6 @@ import org.apache.ode.utils.WatchDog; import javax.xml.namespace.QName; import java.io.File; import java.io.FileFilter; -import java.io.FilenameFilter; import java.io.IOException; import java.util.Collection; import java.util.HashMap; @@ -99,7 +98,7 @@ public class DeploymentPoller { public DeploymentPoller(File deployDir, final ODEServer odeServer) { _odeServer = odeServer; _deployDir = deployDir; - clusterEnabled = _odeServer.getIsCluteringEnabled(); + clusterEnabled = _odeServer.isClusteringEnabled(); if (!_deployDir.exists()) { boolean isDeployDirCreated = _deployDir.mkdir(); if (!isDeployDirCreated) { http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java index 61bf00d..01e003b 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java +++ b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java @@ -20,24 +20,6 @@ package org.apache.ode.axis2.service; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collection; -import java.util.List; -import java.util.zip.ZipEntry; -import java.util.zip.ZipInputStream; - -import javax.activation.DataHandler; -import javax.wsdl.Definition; -import javax.wsdl.WSDLException; -import javax.wsdl.factory.WSDLFactory; -import javax.wsdl.xml.WSDLReader; -import javax.xml.namespace.QName; - import org.apache.axiom.om.OMAbstractFactory; import org.apache.axiom.om.OMElement; import org.apache.axiom.om.OMNamespace; @@ -45,27 +27,37 @@ import org.apache.axiom.om.OMText; import org.apache.axiom.soap.SOAPEnvelope; import org.apache.axiom.soap.SOAPFactory; import org.apache.axis2.AxisFault; -import org.apache.ode.bpel.clapi.ClusterLock; import org.apache.axis2.context.MessageContext; import org.apache.axis2.description.AxisService; -import org.apache.axis2.description.AxisOperation; import org.apache.axis2.engine.AxisConfiguration; import org.apache.axis2.engine.AxisEngine; import org.apache.axis2.receivers.AbstractMessageReceiver; import org.apache.axis2.util.Utils; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.commons.lang.StringUtils; import org.apache.ode.axis2.ODEServer; import org.apache.ode.axis2.OdeFault; import org.apache.ode.axis2.deploy.DeploymentPoller; import org.apache.ode.axis2.hooks.ODEAxisService; -import org.apache.ode.bpel.iapi.BpelServer; +import org.apache.ode.bpel.clapi.ClusterLock; import org.apache.ode.bpel.iapi.ProcessConf; import org.apache.ode.bpel.iapi.ProcessStore; import org.apache.ode.il.OMUtils; -import org.apache.ode.utils.fs.FileUtils; import org.apache.ode.utils.Namespaces; +import org.apache.ode.utils.fs.FileUtils; + +import javax.activation.DataHandler; +import javax.wsdl.Definition; +import javax.wsdl.WSDLException; +import javax.wsdl.factory.WSDLFactory; +import javax.wsdl.xml.WSDLReader; +import javax.xml.namespace.QName; +import java.io.*; +import java.util.Collection; +import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; /** * Axis wrapper for process deployment. @@ -95,7 +87,7 @@ public class DeploymentWebService { _store = store; _poller = poller; _odeServer = odeServer; - clusterEnabled = _odeServer.getIsCluteringEnabled(); + clusterEnabled = _odeServer.isClusteringEnabled(); Definition def; WSDLReader wsdlReader = WSDLFactory.newInstance().newWSDLReader(); http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java index 07d3d8d..5a2e0f9 100644 --- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java +++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java @@ -38,7 +38,7 @@ public interface ClusterManager { * Return whether the local member is Master or not * @return */ - boolean getIsMaster(); + boolean isMaster(); /** * Set the Process Store object which uses for clustering @@ -55,8 +55,7 @@ public interface ClusterManager { /** * Register the cluster for message listener */ - void registerClusterProcessStoreMessageListener(); - + void registerClusterProcessStoreMessageListener(ProcessStoreClusterListener listener); /** * Register Scheduler as ClusterMemberListener * @param listener @@ -81,5 +80,5 @@ public interface ClusterManager { /** * Return local member's uuid in the cluster */ - String getUuid(); + String getNodeID(); } http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java index a396f6f..79d9a78 100644 --- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java +++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java @@ -27,7 +27,8 @@ public abstract class ProcessStoreClusterEvent implements Serializable { protected String info ; - private String uuid; + /** Unique ID of the Node in the Cluster generating the Event */ + private String eventGeneratingNode; public ProcessStoreClusterEvent(String deploymentUnit) { this.deploymentUnit = deploymentUnit; @@ -38,12 +39,12 @@ public abstract class ProcessStoreClusterEvent implements Serializable { return "{ProcessStoreClusterEvent#" + deploymentUnit +"}"; } - public void setUuid(String uuid) { - this.uuid = uuid; + public void setEventGeneratingNode(String uuid) { + this.eventGeneratingNode = uuid; } - public String getUuid() { - return uuid; + public String getEventGeneratingNode() { + return eventGeneratingNode; } public String getDuName() { http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterListener.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterListener.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterListener.java new file mode 100644 index 0000000..26f42cf --- /dev/null +++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterListener.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ode.bpel.clapi; + +public interface ProcessStoreClusterListener { + public void onProcessStoreClusterEvent(ProcessStoreClusterEvent message); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java ---------------------------------------------------------------------- diff --git a/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java b/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java index 5c0ed13..5697422 100644 --- a/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java +++ b/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java @@ -19,6 +19,10 @@ package org.apache.ode.il.config; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.utils.SystemUtils; + import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -26,10 +30,6 @@ import java.io.IOException; import java.util.Map; import java.util.Properties; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.ode.utils.SystemUtils; - /** * Configuration object used for configuring the intergration layer. The propereties are those likely to be common to all layers. * @@ -295,8 +295,8 @@ public class OdeConfigProperties { return getProperty(OdeConfigProperties.PROP_DEPLOY_DIR); } - public String getClusteringState() { - return getProperty(OdeConfigProperties.PROP_CLUSTERING_ENABLED); + public boolean isClusteringEnabled() { + return Boolean.valueOf(getProperty(OdeConfigProperties.PROP_CLUSTERING_ENABLED, "false")); } public String getClusteringImplClass() { http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java ---------------------------------------------------------------------- diff --git a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java index 51fea5a..d701e23 100644 --- a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java +++ b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java @@ -47,7 +47,6 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl implements Cluster public ClusterProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String persistenceType, OdeConfigProperties props, boolean createDatamodel, ClusterManager clusterManager) { super(eprContext,ds,persistenceType,props,createDatamodel); _clusterManager = clusterManager; - _clusterManager.setClusterProcessStore(this); } public Collection deploy(final File deploymentUnitDirectory) { @@ -59,7 +58,7 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl implements Cluster private void publishProcessStoreDeployedEvent(String duName){ deployedEvent = new ProcessStoreDeployedEvent(duName); _clusterManager.publishProcessStoreClusterEvent(deployedEvent); - __log.info("Completed actual deployment for " +duName +" by " +deployedEvent.getUuid()); + __log.info("Completed actual deployment for " +duName +" by " +deployedEvent.getEventGeneratingNode()); } public void deployProcesses(final String duName) { @@ -121,7 +120,7 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl implements Cluster private void publishProcessStoreUndeployedEvent(String duName){ undeployedEvent = new ProcessStoreUndeployedEvent(duName); _clusterManager.publishProcessStoreClusterEvent(undeployedEvent); - __log.info("Completed actual undeployment for " +duName +" by " +undeployedEvent.getUuid()); + __log.info("Completed actual undeployment for " +duName +" by " +undeployedEvent.getEventGeneratingNode()); } /** http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java index f68068a..9d2a554 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java @@ -18,8 +18,20 @@ */ package org.apache.ode.clustering.hazelcast; -import com.hazelcast.core.*; +import com.hazelcast.config.Config; import com.hazelcast.config.FileSystemXmlConfig; +import com.hazelcast.config.ListenerConfig; +import com.hazelcast.config.TopicConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.IMap; +import com.hazelcast.core.ITopic; +import com.hazelcast.core.Member; +import com.hazelcast.core.MemberAttributeEvent; +import com.hazelcast.core.MembershipEvent; +import com.hazelcast.core.MembershipListener; +import com.hazelcast.core.Message; +import com.hazelcast.core.MessageListener; import java.io.File; import java.io.FileNotFoundException; @@ -34,21 +46,28 @@ import org.apache.ode.bpel.clapi.*; /** * This class implements necessary methods to build the cluster using hazelcast */ -public class HazelcastClusterImpl implements ClusterManager { +public class HazelcastClusterImpl implements ClusterManager, ProcessStoreClusterListener { private static final Log __log = LogFactory.getLog(HazelcastClusterImpl.class); private HazelcastInstance _hazelcastInstance; private boolean isMaster = false; + private String nodeHostName; private String nodeID; - private String uuid; - private Member leader; private IMap deployment_lock_map; private IMap instance_lock_map; - private ITopic clusterMessageTopic; + private ITopic clusterDeploymentMessageTopic; private ClusterProcessStore _clusterProcessStore; - private ClusterMemberListener _listener; private ClusterLock _hazelcastDeploymentLock; private ClusterLock _hazelcastInstanceLock; + private ClusterDeploymentMessageListener clusterDeploymentMessageListener; + private ClusterMemberShipListener clusterMemberShipListener; + private List clusterMemberListenerList = null; + + public HazelcastClusterImpl() { + clusterMemberShipListener = new ClusterMemberShipListener(); + clusterDeploymentMessageListener = new ClusterDeploymentMessageListener(); + clusterDeploymentMessageListener.registerClusterProcessStoreListener((ProcessStoreClusterListener)this); + } public void init(File configRoot) { @@ -63,7 +82,8 @@ public class HazelcastClusterImpl implements ClusterManager { __log.error("hazelcast.xml does not exist or is not a file"); else try { - _hazelcastInstance = Hazelcast.newHazelcastInstance(new FileSystemXmlConfig(hzXml)); + Config config = loadConfig(hzXml); + _hazelcastInstance = Hazelcast.newHazelcastInstance(config); } catch (FileNotFoundException fnf) { __log.error(fnf); } @@ -71,37 +91,71 @@ public class HazelcastClusterImpl implements ClusterManager { if (_hazelcastInstance != null) { // Registering this node in the cluster. - _hazelcastInstance.getCluster().addMembershipListener(new ClusterMemberShipListener()); + //_hazelcastInstance.getCluster().addMembershipListener(new ClusterMemberShipListener()); Member localMember = _hazelcastInstance.getCluster().getLocalMember(); - nodeID = localMember.getInetSocketAddress().getHostName() +":" +localMember.getInetSocketAddress().getPort(); - uuid = localMember.getUuid(); - __log.info("Registering HZ localMember ID " + nodeID); - - markAsMaster(); + nodeHostName = localMember.getSocketAddress().getHostName() + ":" + localMember.getSocketAddress().getPort(); + nodeID = localMember.getUuid(); + __log.info("Registering HZ localMember:" + nodeHostName); deployment_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_LOCK); instance_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_PROCESS_INSTANCE_LOCK); - clusterMessageTopic = _hazelcastInstance.getTopic(HazelcastConstants.ODE_CLUSTER_MSG); + clusterDeploymentMessageTopic = _hazelcastInstance.getTopic(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_TOPIC); _hazelcastDeploymentLock = (ClusterLock) new HazelcastDeploymentLock(deployment_lock_map); _hazelcastInstanceLock = (ClusterLock) new HazelcastInstanceLock(instance_lock_map); + + markAsMaster(); } } + protected Config loadConfig(File hazelcastConfigFile) throws FileNotFoundException { + Config config = new FileSystemXmlConfig(hazelcastConfigFile); + + //add Cluster membership listener + ListenerConfig clusterMemberShipListenerConfig = new ListenerConfig(); + clusterMemberShipListenerConfig.setImplementation(clusterMemberShipListener); + config.addListenerConfig(clusterMemberShipListenerConfig); + + //set topic message listener + ListenerConfig topicListenerConfig = new ListenerConfig(); + topicListenerConfig.setImplementation(clusterDeploymentMessageListener); + TopicConfig topicConfig = config.getTopicConfig(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_TOPIC); + topicConfig.addMessageListenerConfig(topicListenerConfig); + + return config; + } + class ClusterMemberShipListener implements MembershipListener { + + public ClusterMemberShipListener() { + clusterMemberListenerList = new ArrayList(); + } + + public void registerClusterMemberListener(ClusterMemberListener listener) { + clusterMemberListenerList.add(listener); + } + @Override public void memberAdded(MembershipEvent membershipEvent) { - String nodeId = membershipEvent.getMember().getUuid(); - __log.info("Member Added " +nodeId); - if(isMaster && _listener != null) _listener.memberAdded(nodeId); + String eventNodeID = membershipEvent.getMember().getUuid(); + __log.info("Member Added " + eventNodeID); + if (isMaster) { + for (ClusterMemberListener listener : clusterMemberListenerList) { + listener.memberAdded(eventNodeID); + } + } } @Override public void memberRemoved(MembershipEvent membershipEvent) { - String nodeId = membershipEvent.getMember().getUuid(); - __log.info("Member Removed " + nodeId); + String eventNodeID = membershipEvent.getMember().getUuid(); + __log.info("Member Removed " + eventNodeID); markAsMaster(); - if(isMaster && _listener != null) _listener.memberRemoved(nodeId); + if (isMaster) { + for (ClusterMemberListener listener : clusterMemberListenerList) { + listener.memberRemoved(eventNodeID); + } + } } @Override @@ -111,36 +165,48 @@ public class HazelcastClusterImpl implements ClusterManager { } public void publishProcessStoreClusterEvent(ProcessStoreClusterEvent clusterEvent) { - clusterEvent.setUuid(uuid); - __log.info("Send " +clusterEvent.getInfo() +" Cluster Message " +"for " +clusterEvent.getDuName() +" [" +nodeID +"]"); - clusterMessageTopic.publish(clusterEvent); + clusterEvent.setEventGeneratingNode(nodeID); + __log.info("Send " + clusterEvent.getInfo() + " Cluster Message " + "for " + clusterEvent.getDuName() + " [" + nodeHostName + "]"); + clusterDeploymentMessageTopic.publish(clusterEvent); } - class ClusterMessageListener implements MessageListener { + class ClusterDeploymentMessageListener implements MessageListener { + List clusterProcessStoreListenerList = null; + + public ClusterDeploymentMessageListener() { + clusterProcessStoreListenerList = new ArrayList(); + } + + public void registerClusterProcessStoreListener(ProcessStoreClusterListener listener) { + clusterProcessStoreListenerList.add(listener); + } + @Override public void onMessage(Message msg) { - handleEvent(msg.getMessageObject()); + for (ProcessStoreClusterListener listener : clusterProcessStoreListenerList) { + listener.onProcessStoreClusterEvent(msg.getMessageObject()); + } } } - private void handleEvent(ProcessStoreClusterEvent message) { + public void onProcessStoreClusterEvent(ProcessStoreClusterEvent message) { if (message instanceof ProcessStoreDeployedEvent) { ProcessStoreDeployedEvent event = (ProcessStoreDeployedEvent) message; - String eventUuid = event.getUuid(); - if (!uuid.equals(eventUuid)) { + String eventUuid = event.getEventGeneratingNode(); + if (!nodeID.equals(eventUuid)) { String duName = event.getDuName(); - __log.info("Receive " +event.getInfo() +" Cluster Message " +"for " +event.getDuName() +" [" +nodeID +"]"); + __log.info("Receive " + event.getInfo() + " Cluster Message " + "for " + event.getDuName() + " [" + nodeHostName + "]"); _clusterProcessStore.deployProcesses(duName); } } else if (message instanceof ProcessStoreUndeployedEvent) { ProcessStoreUndeployedEvent event = (ProcessStoreUndeployedEvent) message; - String eventUuid = event.getUuid(); - if (!uuid.equals(eventUuid)) { + String eventUuid = event.getEventGeneratingNode(); + if (!nodeID.equals(eventUuid)) { String duName = event.getDuName(); - __log.info("Receive " +event.getInfo() +" Cluster Message " +"for " +event.getDuName() +" [" +nodeID +"]"); + __log.info("Receive " + event.getInfo() + " Cluster Message " + "for " + event.getDuName() + " [" + nodeHostName + "]"); _clusterProcessStore.undeployProcesses(duName); } } @@ -148,36 +214,38 @@ public class HazelcastClusterImpl implements ClusterManager { } private void markAsMaster() { - leader = _hazelcastInstance.getCluster().getMembers().iterator().next(); - if (leader.localMember() && isMaster == false) { + Member member = _hazelcastInstance.getCluster().getMembers().iterator().next(); + if (member.localMember() && isMaster == false) { isMaster = true; - if(_listener != null) _listener.memberElectedAsMaster(uuid); + for (ClusterMemberListener listener : clusterMemberListenerList) { + listener.memberElectedAsMaster(nodeID); + } } __log.info(isMaster); } - public boolean getIsMaster() { + public boolean isMaster() { return isMaster; } - public String getUuid() { - return uuid; + public String getNodeID() { + return nodeID; } public void setClusterProcessStore(ClusterProcessStore store) { _clusterProcessStore = store; } - public void registerClusterProcessStoreMessageListener() { - clusterMessageTopic.addMessageListener(new ClusterMessageListener()); + public void registerClusterProcessStoreMessageListener(ProcessStoreClusterListener listener) { + clusterDeploymentMessageListener.registerClusterProcessStoreListener(listener); } public void registerClusterMemberListener(ClusterMemberListener listener) { - _listener = listener; + clusterMemberShipListener.registerClusterMemberListener(listener); } public void shutdown() { - if(_hazelcastInstance != null) _hazelcastInstance.getLifecycleService().shutdown(); + if (_hazelcastInstance != null) _hazelcastInstance.shutdown(); } public ClusterLock getDeploymentLock(){ http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java index 76e7341..aa787e9 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java @@ -21,9 +21,9 @@ package org.apache.ode.clustering.hazelcast; * Constants used in Hazelcast based clustering implementation */ public final class HazelcastConstants { - public static final String ODE_CLUSTER_DEPLOYMENT_LOCK = "DEPLOYMENT_LOCK"; - public static final String ODE_CLUSTER_PROCESS_INSTANCE_LOCK = "PROCESS_INSTANCE_LOCK "; - public static final String ODE_CLUSTER_MSG = "CLUSTER_MSG"; + public static final String ODE_CLUSTER_DEPLOYMENT_LOCK = "ODE_DEPLOYMENT_LOCK"; + public static final String ODE_CLUSTER_PROCESS_INSTANCE_LOCK = "ODE_PROCESS_INSTANCE_LOCK "; + public static final String ODE_CLUSTER_DEPLOYMENT_TOPIC = "ODE_DEPLOYMENT_TOPIC"; private HazelcastConstants() { } http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/repositories.rb ---------------------------------------------------------------------- diff --git a/repositories.rb b/repositories.rb index f6a40fa..e23cde1 100644 --- a/repositories.rb +++ b/repositories.rb @@ -15,5 +15,5 @@ repositories.remote << "http://repo1.maven.org/maven2" repositories.remote << "http://people.apache.org/~vanto/m2/" -repositories.remote << "https://repository.apache.org/content/groups/snapshots" +repositories.remote << "http://repository.apache.org/content/groups/snapshots" repositories.release_to[:url] ||= "sftp://guest@localhost/home/guest" http://git-wip-us.apache.org/repos/asf/ode/blob/348ae9de/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java ---------------------------------------------------------------------- diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java index df33ae0..1da5571 100644 --- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java +++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java @@ -482,7 +482,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList if(!_isClusterEnabled) enqueueTasksReadnodeIds(); else { - if (_clusterManager.getIsMaster()) enqueueTasksReadnodeIds(); + if (_clusterManager.isMaster()) enqueueTasksReadnodeIds(); } _todo.start(); @@ -725,16 +725,16 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList } } - /*public void updateHeartBeat(String nodeId) { + public void updateHeartBeat(String nodeId) { if (nodeId == null) return; - if (_nodeId.equals(nodeId)) - return; + /*if (_nodeId.equals(nodeId)) + return;*/ - _lastHeartBeat.put(nodeId, System.currentTimeMillis()); + //_lastHeartBeat.put(nodeId, System.currentTimeMillis()); _knownNodes.add(nodeId); - }*/ + } boolean doLoadImmediate() { __log.debug("LOAD IMMEDIATE started"); @@ -815,7 +815,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList final ArrayList activeNodes; // for cluster mode - if (_isClusterEnabled && _clusterManager.getIsMaster()) { + if (_isClusterEnabled && _clusterManager.isMaster()) { activeNodes = (ArrayList) _clusterManager.getActiveNodes(); } //for standalone ODE deployments @@ -984,7 +984,7 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList ArrayList knownNodes = new ArrayList(_knownNodes); // for cluster mode - if (_isClusterEnabled && _clusterManager.getIsMaster()) { + if (_isClusterEnabled && _clusterManager.isMaster()) { ArrayList memberList = (ArrayList) _clusterManager.getActiveNodes(); //find stale nodes