Return-Path: X-Original-To: apmail-ode-commits-archive@www.apache.org Delivered-To: apmail-ode-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C1F1A17C9B for ; Fri, 6 Nov 2015 10:50:56 +0000 (UTC) Received: (qmail 84033 invoked by uid 500); 6 Nov 2015 10:50:56 -0000 Delivered-To: apmail-ode-commits-archive@ode.apache.org Received: (qmail 83956 invoked by uid 500); 6 Nov 2015 10:50:56 -0000 Mailing-List: contact commits-help@ode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ode.apache.org Delivered-To: mailing list commits@ode.apache.org Received: (qmail 83700 invoked by uid 99); 6 Nov 2015 10:50:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Nov 2015 10:50:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 55BC8E07F6; Fri, 6 Nov 2015 10:50:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sathwik@apache.org To: commits@ode.apache.org Date: Fri, 06 Nov 2015 10:51:03 -0000 Message-Id: <212bf1dba78c47be856a82499a0fc527@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [08/30] ode git commit: redesigning phase 2 redesigning phase 2 Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/521d640d Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/521d640d Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/521d640d Branch: refs/heads/ODE-563 Commit: 521d640d672133d3864a61c54ba2b59e70e35a4b Parents: afa36ee Author: suba Authored: Tue Jun 16 17:20:04 2015 +0530 Committer: suba Committed: Tue Jun 16 17:20:04 2015 +0530 ---------------------------------------------------------------------- Rakefile | 10 +-- .../java/org/apache/ode/axis2/ODEServer.java | 6 +- .../ode/axis2/deploy/DeploymentPoller.java | 29 ++++---- .../ode/axis2/service/DeploymentWebService.java | 25 +++++-- .../apache/ode/bpel/clapi/ClusterManager.java | 22 +++++++ .../bpel/clapi/ProcessStoreDeployedEvent.java | 40 ++++++++++++ .../ode/store/ClusterProcessStoreImpl.java | 43 +++--------- .../hazelcast/HazelcastClusterImpl.java | 69 ++++++++++++++++---- .../hazelcast/HazelcastConstants.java | 1 - .../hazelcast/HazelcastInstanceConfig.java | 56 ---------------- 10 files changed, 175 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/Rakefile ---------------------------------------------------------------------- diff --git a/Rakefile b/Rakefile index 06dcbd5..7c0fa67 100644 --- a/Rakefile +++ b/Rakefile @@ -86,7 +86,7 @@ define "ode" do "scheduler-simple", "bpel-schemas", "bpel-store", "utils", "agents", "clustering"), AXIOM, AXIS2_ALL, COMMONS.lang, COMMONS.collections, COMMONS.httpclient, COMMONS.lang, DERBY, GERONIMO.kernel, GERONIMO.transaction, JAVAX.activation, JAVAX.servlet, JAVAX.stream, - JAVAX.transaction, JENCKS, WSDL4J, WS_COMMONS, XMLBEANS, AXIS2_MODULES.libs, SLF4J, LOG4J, HAZELCAST + JAVAX.transaction, JENCKS, WSDL4J, WS_COMMONS, XMLBEANS, AXIS2_MODULES.libs, SLF4J, LOG4J test.exclude 'org.apache.ode.axis2.management.*' test.with project("tools"), AXIOM, JAVAX.javamail, COMMONS.codec, COMMONS.httpclient, XERCES, WOODSTOX @@ -166,7 +166,7 @@ define "ode" do desc "ODE APIs" define "bpel-api" do - compile.with projects("utils", "bpel-obj", "bpel-schemas"), WSDL4J, XERCES, SLF4J, LOG4J, HAZELCAST + compile.with projects("utils", "bpel-obj", "bpel-schemas"), WSDL4J, XERCES, SLF4J, LOG4J package :jar end @@ -208,7 +208,7 @@ define "ode" do desc "ODE Clustering" define "clustering" do - compile.with projects("bpel-api"),HAZELCAST, COMMONS.logging + compile.with projects("bpel-api","bpel-store"),HAZELCAST, COMMONS.logging package :jar end @@ -270,8 +270,8 @@ define "ode" do desc "ODE Process Store" define "bpel-store" do compile.with projects("bpel-api", "bpel-compiler", "bpel-dao", "bpel-obj", "bpel-schemas", "bpel-epr", - "dao-hibernate", "dao-jpa", "clustering", "utils"), - JAVAX.persistence, JAVAX.stream, JAVAX.transaction, HIBERNATE, HSQLDB, XMLBEANS, XERCES, WSDL4J, OPENJPA, SPRING, SLF4J, LOG4J,HAZELCAST + "dao-hibernate", "dao-jpa", "utils"), + JAVAX.persistence, JAVAX.stream, JAVAX.transaction, HIBERNATE, HSQLDB, XMLBEANS, XERCES, WSDL4J, OPENJPA, SPRING, SLF4J, LOG4J compile { open_jpa_enhance } resources hibernate_doclet(:package=>"org.apache.ode.store.hib", :excludedtags=>"@version,@author,@todo") http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java index 26489d2..f0ad470 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java +++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java @@ -234,7 +234,7 @@ public class ODEServer { try { __log.debug("Initializing Deployment Web Service"); - new DeploymentWebService().enableService(_configContext.getAxisConfiguration(), _store, _poller, _appRoot.getAbsolutePath(), _workRoot.getAbsolutePath()); + new DeploymentWebService().enableService(_configContext.getAxisConfiguration(), _store, _poller, _appRoot.getAbsolutePath(), _workRoot.getAbsolutePath(),this); } catch (Exception e) { throw new ServletException(e); } @@ -490,8 +490,8 @@ public class ODEServer { private void initClustering() { String clusterImplName = _odeConfig.getClusteringImplClass(); try { - Class clustering_class = this.getClass().getClassLoader().loadClass(clusterImplName); - _clusterManager = (ClusterManager) clustering_class.newInstance(); + Class clusterImplClass = this.getClass().getClassLoader().loadClass(clusterImplName); + _clusterManager = (ClusterManager) clusterImplClass.newInstance(); } catch (Exception ex) { __log.error(ex); } http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java index 66890ba..ccb029b 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java +++ b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java @@ -54,7 +54,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; -import org.apache.ode.clustering.hazelcast.HazelcastClusterImpl; +import org.apache.ode.bpel.clapi.ClusterManager; /** * Polls a directory for the deployment of a new deployment unit. @@ -100,7 +100,7 @@ public class DeploymentPoller { public DeploymentPoller(File deployDir, final ODEServer odeServer) { _odeServer = odeServer; _deployDir = deployDir; - clusterEnabled = _odeServer.getClusteringState(); + clusterEnabled = _odeServer.getIsCluteringEnabled(); if (!_deployDir.exists()) { boolean isDeployDirCreated = _deployDir.mkdir(); if (!isDeployDirCreated) { @@ -140,10 +140,13 @@ public class DeploymentPoller { // Checking for new deployment directories if (isDeploymentFromODEFileSystemAllowed() && files != null) { for (File file : files) { - __log.info("Trying to access the lock for " +file.getName()); - duLocked = lock(file.getName()); - try { - if (duLocked) { + String test = file.getName(); + __log.info("Trying to access the lock for " + test); + __log.info("Test null key value " +test); + duLocked = pollerTryLock(test); + + if (duLocked) { + try { File deployXml = new File(file, "deploy.xml"); File deployedMarker = new File(_deployDir, file.getName() + ".deployed"); @@ -185,10 +188,10 @@ public class DeploymentPoller { } catch (Exception e) { __log.error("Deployment of " + file.getName() + " failed, aborting for now.", e); } + } finally { + __log.info("Trying to release the lock for " + file.getName()); + unlock(file.getName()); } - } finally { - __log.info("Trying to release the lock for " + file.getName()); - unlock(file.getName()); } } } @@ -341,16 +344,16 @@ public class DeploymentPoller { } //Implementation of IMap key Lock - public boolean lock(String key) { + private boolean pollerTryLock(String key) { if(clusterEnabled) { - return _odeServer.getBpelServer().getContexts().hazelcastClusterImpl.lock(key); + return _odeServer.getBpelServer().getContexts().clusterManager.tryLock(key); } else return true; } - public boolean unlock(String key) { + private boolean unlock(String key) { if(clusterEnabled) { - return _odeServer.getBpelServer().getContexts().hazelcastClusterImpl.unlock(key); + return _odeServer.getBpelServer().getContexts().clusterManager.unlock(key); } else return true; } http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java index bd35167..1951cf5 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java +++ b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java @@ -55,6 +55,7 @@ import org.apache.axis2.util.Utils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.lang.StringUtils; +import org.apache.ode.axis2.ODEServer; import org.apache.ode.axis2.OdeFault; import org.apache.ode.axis2.deploy.DeploymentPoller; import org.apache.ode.axis2.hooks.ODEAxisService; @@ -76,9 +77,11 @@ public class DeploymentWebService { private final OMNamespace _deployapi; private File _deployPath; + private ODEServer _odeServer; private DeploymentPoller _poller; private ProcessStore _store; + private boolean clusterEnabled; public DeploymentWebService() { _pmapi = OMAbstractFactory.getOMFactory().createOMNamespace("http://www.apache.org/ode/pmapi","pmapi"); @@ -86,10 +89,12 @@ public class DeploymentWebService { } public void enableService(AxisConfiguration axisConfig, ProcessStore store, - DeploymentPoller poller, String rootpath, String workPath) throws AxisFault, WSDLException { + DeploymentPoller poller, String rootpath, String workPath, ODEServer odeServer) throws AxisFault, WSDLException { _deployPath = new File(workPath, "processes"); _store = store; _poller = poller; + _odeServer = odeServer; + clusterEnabled = _odeServer.getIsCluteringEnabled(); Definition def; WSDLReader wsdlReader = WSDLFactory.newInstance().newWSDLReader(); @@ -170,7 +175,7 @@ public class DeploymentWebService { __log.info("Trying to access the lock for " + dest.getName()); //lock on deployment unit directory name - duLocked = _poller.lock(dest.getName()); + duLocked = lock(dest.getName()); if (duLocked) { boolean createDir = dest.mkdir(); @@ -214,7 +219,7 @@ public class DeploymentWebService { sendResponse(factory, messageContext, "deployResponse", response); } finally { __log.info("Trying to release the lock for " + dest.getName()); - _poller.unlock(dest.getName()); + unlock(dest.getName()); } } } finally { @@ -366,6 +371,18 @@ public class DeploymentWebService { out.close(); } + //Implementation of IMap key Lock + private boolean lock(String key) { + if(clusterEnabled) { + return _odeServer.getBpelServer().getContexts().clusterManager.lock(key); + } + else return true; + } - + private boolean unlock(String key) { + if(clusterEnabled) { + return _odeServer.getBpelServer().getContexts().clusterManager.unlock(key); + } + else return true; + } } http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java index 4a0aded..a1fe194 100644 --- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java +++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java @@ -54,6 +54,28 @@ public interface ClusterManager { */ boolean unlock(String key); + /** + * Tries to acquire the lock for the specified key. + * @param key + * @return + */ + boolean tryLock(String key); + /** + * Set the Process Store object which uses for clustering + * @param ps + */ + void setClusterProcessStore(Object ps); + /** + * Publish Deploy event to the cluster by deploy initiator + * @param event + */ + void publishProcessStoreEvent(Object event); + + /** + * Handle event according to received event + * @param message + */ + void handleEvent(Object message); } http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreDeployedEvent.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreDeployedEvent.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreDeployedEvent.java new file mode 100644 index 0000000..a623d47 --- /dev/null +++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreDeployedEvent.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.bpel.clapi; + +import java.io.Serializable; + +public class ProcessStoreDeployedEvent implements Serializable { + private static final long serialVersionUID = 1L; + + public final String deploymentUnit; + + public final String info; + + public ProcessStoreDeployedEvent(String deploymentUnit) { + this.info = "Deployment Event"; + this.deploymentUnit = deploymentUnit; + } + + @Override + public String toString() { + return "{ProcessStoreDeployedEvent#" + deploymentUnit +"}"; + } + +} http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java ---------------------------------------------------------------------- diff --git a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java index 22ba2cd..6f35110 100644 --- a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java +++ b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java @@ -20,9 +20,9 @@ package org.apache.ode.store; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.ode.bpel.iapi.ProcessConf; +import org.apache.ode.bpel.clapi.ClusterManager; +import org.apache.ode.bpel.clapi.ProcessStoreDeployedEvent; import org.apache.ode.bpel.iapi.ProcessState; -import org.apache.ode.clustering.hazelcast.HazelcastClusterImpl; import org.apache.ode.bpel.iapi.EndpointReferenceContext; import org.apache.ode.il.config.OdeConfigProperties; @@ -33,24 +33,17 @@ import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; -import com.hazelcast.core.*; - public class ClusterProcessStoreImpl extends ProcessStoreImpl{ private static final Log __log = LogFactory.getLog(ClusterProcessStoreImpl.class); - private HazelcastInstance _hazelcastInstance; - private Member deployInitiator; - private ITopic clusterMessageTopic; private final ArrayList loaded = new ArrayList(); + private ClusterManager _clusterManager; + private ProcessStoreDeployedEvent deployedEvent; - - public ClusterProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String persistenceType, OdeConfigProperties props, boolean createDatamodel, HazelcastClusterImpl hazelcastClusterImpl) { + public ClusterProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String persistenceType, OdeConfigProperties props, boolean createDatamodel, ClusterManager clusterManager) { super(eprContext,ds,persistenceType,props,createDatamodel); - _hazelcastInstance = hazelcastClusterImpl.getHazelcastInstance(); - - // Register for listening to message listener - clusterMessageTopic = _hazelcastInstance.getTopic("deployedMsg"); - clusterMessageTopic.addMessageListener(new ClusterMessageListener()); + _clusterManager = clusterManager; + _clusterManager.setClusterProcessStore(this); } public Collection deploy(final File deploymentUnitDirectory) { @@ -63,9 +56,9 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{ return deployed; } - public void publishProcessStoreDeployedEvent(String duName){ - deployInitiator = _hazelcastInstance.getCluster().getLocalMember(); - clusterMessageTopic.publish("Deployed " +duName); + private void publishProcessStoreDeployedEvent(String duName){ + deployedEvent = new ProcessStoreDeployedEvent(duName); + _clusterManager.publishProcessStoreEvent(deployedEvent); } public void publishService(final String duName) { @@ -110,22 +103,6 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{ //loadAll(); } - class ClusterMessageListener implements MessageListener { - @Override - public void onMessage(Message msg) { - String message = msg.getMessageObject(); - String arr[] = message.split(" ", 2); - String duName = arr[1]; - if(message.contains("Deployed ")) { - if(_hazelcastInstance.getCluster().getLocalMember() != deployInitiator) { - __log.info("Receive deployment msg to " +_hazelcastInstance.getCluster().getLocalMember() +" for " +duName); - publishService(duName); - } - else deployInitiator = null; - } - } - } - private Pattern getPreviousPackageVersionPattern(String duName) { String[] nameParts = duName.split("/"); /* Replace the version number (if any) with regexp to match any version number */ http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java index 6ae701b..2e6868f 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java @@ -19,38 +19,49 @@ package org.apache.ode.clustering.hazelcast; import com.hazelcast.core.*; +import com.hazelcast.config.FileSystemXmlConfig; import java.io.File; +import java.io.FileNotFoundException; import java.util.ArrayList; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.ode.store.ClusterProcessStoreImpl; import org.apache.ode.bpel.clapi.ClusterManager; +import org.apache.ode.bpel.clapi.ProcessStoreDeployedEvent; /** * This class implements necessary methods to build the cluster using hazelcast */ -public class HazelcastClusterImpl implements ClusterManager{ +public class HazelcastClusterImpl implements ClusterManager { private static final Log __log = LogFactory.getLog(HazelcastClusterImpl.class); private HazelcastInstance _hazelcastInstance; private boolean isMaster = false; private Member leader; - + private Member deployInitiator; private IMap lock_map; + private ITopic clusterMessageTopic; + private ClusterProcessStoreImpl _clusterProcessStore; public void init(File configRoot) { - //First,looks for the hazelcast.config system property. If it is set, its value is used as the path. - //Else it will load the hazelcast.xml file using FileSystemXmlConfig() + /*First,looks for the hazelcast.config system property. If it is set, its value is used as the path. + Else it will load the hazelcast.xml file using FileSystemXmlConfig()*/ String hzConfig = System.getProperty("hazelcast.config"); if (hzConfig != null) _hazelcastInstance = Hazelcast.newHazelcastInstance(); else { File hzXml = new File(configRoot, "hazelcast.xml"); if (!hzXml.isFile()) __log.error("hazelcast.xml does not exist or is not a file"); - else _hazelcastInstance = Hazelcast.newHazelcastInstance(new FileSystemXmlConfig(hzXml)); + else + try { + _hazelcastInstance = Hazelcast.newHazelcastInstance(new FileSystemXmlConfig(hzXml)); + } catch (FileNotFoundException fnf) { + __log.error(fnf); + } } if (_hazelcastInstance != null) { @@ -60,10 +71,15 @@ public class HazelcastClusterImpl implements ClusterManager{ __log.info("Registering HZ localMember ID " + localMember); markAsMaster(); lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_LOCK_MAP); + + // Register for listening to message listener + clusterMessageTopic = _hazelcastInstance.getTopic("deployedMsg"); + clusterMessageTopic.addMessageListener(new ClusterMessageListener()); } } public boolean lock(String key) { + lock_map.putIfAbsent(key,key); lock_map.lock(key); boolean state = lock_map.isLocked(key); __log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state); @@ -81,6 +97,13 @@ public class HazelcastClusterImpl implements ClusterManager{ return state; } + public boolean tryLock(String key) { + lock_map.putIfAbsent(key,key); + boolean state = lock_map.tryLock(key); + __log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state ); + return state; + } + class ClusterMemberShipListener implements MembershipListener { @Override public void memberAdded(MembershipEvent membershipEvent) { @@ -90,10 +113,6 @@ public class HazelcastClusterImpl implements ClusterManager{ @Override public void memberRemoved(MembershipEvent membershipEvent) { markAsMaster(); - // Allow Leader to update distributed map. - if (isMaster) { - String leftMemberID = getHazelCastNodeID(membershipEvent.getMember()); - } } @Override @@ -102,6 +121,32 @@ public class HazelcastClusterImpl implements ClusterManager{ } } + public void publishProcessStoreEvent(Object deployedEvent) { + deployInitiator = _hazelcastInstance.getCluster().getLocalMember(); + clusterMessageTopic.publish(deployedEvent); + } + + + class ClusterMessageListener implements MessageListener { + @Override + public void onMessage(Message msg) { + handleEvent(msg.getMessageObject()); + } + } + + public void handleEvent(Object message) { + if (message instanceof ProcessStoreDeployedEvent) { + ProcessStoreDeployedEvent event = (ProcessStoreDeployedEvent) message; + + if (_hazelcastInstance.getCluster().getLocalMember() != deployInitiator) { + String duName = event.deploymentUnit; + __log.info("Receive deployment msg to " + _hazelcastInstance.getCluster().getLocalMember() + " for " + duName); + _clusterProcessStore.publishService(duName); + } else deployInitiator = null; + } + + } + public void markAsMaster() { leader = _hazelcastInstance.getCluster().getMembers().iterator().next(); if (leader.localMember()) { @@ -114,7 +159,9 @@ public class HazelcastClusterImpl implements ClusterManager{ return isMaster; } - public HazelcastInstance getHazelcastInstance() { - return _hazelcastInstance; + public void setClusterProcessStore(Object store) { + if (store instanceof ClusterProcessStoreImpl) + _clusterProcessStore = (ClusterProcessStoreImpl) store; } } + http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java index e201b70..f9d1004 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java @@ -21,7 +21,6 @@ package org.apache.ode.clustering.hazelcast; * Constants used in Hazelcast based clustering implementation */ public final class HazelcastConstants { - public static final String ODE_CLUSTER_NODE_MAP = "ODE_NODE_ID_MAP"; public static final String ODE_CLUSTER_LOCK_MAP = "ODE_LOCK_MAP"; private HazelcastConstants() { http://git-wip-us.apache.org/repos/asf/ode/blob/521d640d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceConfig.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceConfig.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceConfig.java deleted file mode 100644 index 9e8c59b..0000000 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceConfig.java +++ /dev/null @@ -1,56 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.ode.clustering.hazelcast; - -import com.hazelcast.config.Config; -import com.hazelcast.config.FileSystemXmlConfig; -import com.hazelcast.core.*; - -import java.io.File; -import java.io.FileNotFoundException; - -/** - * This is to create hazelcast instance. - * It sets the config object using hazelcast.xml file.First, it looks for the hazelcast.config system property. If it is set, its value is used as the path. - * Else it will load the hazelcast.xml file using FileSystemXmlConfig() - */ -public class HazelcastInstanceConfig { - private HazelcastInstance hazelcastInstance; - - public HazelcastInstanceConfig() { - hazelcastInstance = Hazelcast.newHazelcastInstance(); - } - - /** - * - * @param hzXml - */ - public HazelcastInstanceConfig(File hzXml) { - try { - Config config = new FileSystemXmlConfig(hzXml); - hazelcastInstance = Hazelcast.newHazelcastInstance(config); - } catch (FileNotFoundException fnf) { - } - } - - public HazelcastInstance getHazelcastInstance() { - return hazelcastInstance; - } -} -