Return-Path: X-Original-To: apmail-ode-commits-archive@www.apache.org Delivered-To: apmail-ode-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9E8CD17CD7 for ; Fri, 6 Nov 2015 10:55:11 +0000 (UTC) Received: (qmail 91410 invoked by uid 500); 6 Nov 2015 10:55:11 -0000 Delivered-To: apmail-ode-commits-archive@ode.apache.org Received: (qmail 91351 invoked by uid 500); 6 Nov 2015 10:55:11 -0000 Mailing-List: contact commits-help@ode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ode.apache.org Delivered-To: mailing list commits@ode.apache.org Received: (qmail 91241 invoked by uid 99); 6 Nov 2015 10:55:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Nov 2015 10:55:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 48573E03D0; Fri, 6 Nov 2015 10:55:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sathwik@apache.org To: commits@ode.apache.org Date: Fri, 06 Nov 2015 10:55:14 -0000 Message-Id: <19888877bbe943e5beddec3259bbdd51@git.apache.org> In-Reply-To: <2be1c5c173c842aca88ac670dc797713@git.apache.org> References: <2be1c5c173c842aca88ac670dc797713@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/30] ode git commit: first phase of implementing using locking mechanism first phase of implementing using locking mechanism Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/71f3d35d Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/71f3d35d Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/71f3d35d Branch: refs/heads/master Commit: 71f3d35d7ca766ddfeca8e0884495b9dfc9b7f04 Parents: 764bf64 Author: suba Authored: Fri Jun 12 01:24:27 2015 +0530 Committer: suba Committed: Fri Jun 12 01:24:27 2015 +0530 ---------------------------------------------------------------------- Rakefile | 2 +- .../java/org/apache/ode/axis2/ODEServer.java | 9 ++- .../ode/axis2/deploy/DeploymentPoller.java | 2 + .../ode/axis2/service/DeploymentWebService.java | 2 + .../apache/ode/bpel/hzapi/HazelcastCluster.java | 2 +- .../ode/store/ClusterProcessStoreImpl.java | 80 ++++++++++++++++++++ .../org/apache/ode/store/ProcessStoreImpl.java | 4 +- .../hazelcast/HazelcastClusterImpl.java | 46 +++++++---- 8 files changed, 126 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/71f3d35d/Rakefile ---------------------------------------------------------------------- diff --git a/Rakefile b/Rakefile index 1320043..5fa4a07 100644 --- a/Rakefile +++ b/Rakefile @@ -270,7 +270,7 @@ define "ode" do desc "ODE Process Store" define "bpel-store" do compile.with projects("bpel-api", "bpel-compiler", "bpel-dao", "bpel-obj", "bpel-schemas", "bpel-epr", - "dao-hibernate", "dao-jpa", "utils"), + "dao-hibernate", "dao-jpa", "clustering", "utils"), JAVAX.persistence, JAVAX.stream, JAVAX.transaction, HIBERNATE, HSQLDB, XMLBEANS, XERCES, WSDL4J, OPENJPA, SPRING, SLF4J, LOG4J compile { open_jpa_enhance } resources hibernate_doclet(:package=>"org.apache.ode.store.hib", :excludedtags=>"@version,@author,@todo") http://git-wip-us.apache.org/repos/asf/ode/blob/71f3d35d/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java index 1547042..d1f6c36 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java +++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java @@ -79,6 +79,7 @@ import org.apache.ode.il.dbutil.Database; import org.apache.ode.scheduler.simple.JdbcDelegate; import org.apache.ode.scheduler.simple.SimpleScheduler; import org.apache.ode.store.ProcessStoreImpl; +import org.apache.ode.store.ClusterProcessStoreImpl; import org.apache.ode.utils.GUID; import org.apache.ode.utils.fs.TempFileManager; @@ -526,13 +527,15 @@ public class ODEServer { _store.registerListener(new ProcessStoreListenerImpl()); _store.setDeployDir( _odeConfig.getDeployDir() != null ? - new File(_odeConfig.getDeployDir()) : - new File(_workRoot, "processes")); + new File(_odeConfig.getDeployDir()) : + new File(_workRoot, "processes")); _store.setConfigDir(_configRoot); } protected ProcessStoreImpl createProcessStore(EndpointReferenceContext eprContext, DataSource ds) { - return new ProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(), _odeConfig, false); + if (isClusteringEnabled) + return new ClusterProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(), _odeConfig, false, hazelcastClusterImpl); + else return new ProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(), _odeConfig, false); } protected Scheduler createScheduler() { http://git-wip-us.apache.org/repos/asf/ode/blob/71f3d35d/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java index 25d7f20..66890ba 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java +++ b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java @@ -140,6 +140,7 @@ public class DeploymentPoller { // Checking for new deployment directories if (isDeploymentFromODEFileSystemAllowed() && files != null) { for (File file : files) { + __log.info("Trying to access the lock for " +file.getName()); duLocked = lock(file.getName()); try { if (duLocked) { @@ -186,6 +187,7 @@ public class DeploymentPoller { } } } finally { + __log.info("Trying to release the lock for " + file.getName()); unlock(file.getName()); } } http://git-wip-us.apache.org/repos/asf/ode/blob/71f3d35d/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java index 1c09bb3..bd35167 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java +++ b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java @@ -167,6 +167,7 @@ public class DeploymentWebService { _poller.hold(); File dest = new File(_deployPath, bundleName + "-" + _store.getCurrentVersion()); + __log.info("Trying to access the lock for " + dest.getName()); //lock on deployment unit directory name duLocked = _poller.lock(dest.getName()); @@ -212,6 +213,7 @@ public class DeploymentWebService { } sendResponse(factory, messageContext, "deployResponse", response); } finally { + __log.info("Trying to release the lock for " + dest.getName()); _poller.unlock(dest.getName()); } } http://git-wip-us.apache.org/repos/asf/ode/blob/71f3d35d/bpel-api/src/main/java/org/apache/ode/bpel/hzapi/HazelcastCluster.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/hzapi/HazelcastCluster.java b/bpel-api/src/main/java/org/apache/ode/bpel/hzapi/HazelcastCluster.java index 4e03c7d..adca32c 100644 --- a/bpel-api/src/main/java/org/apache/ode/bpel/hzapi/HazelcastCluster.java +++ b/bpel-api/src/main/java/org/apache/ode/bpel/hzapi/HazelcastCluster.java @@ -39,7 +39,7 @@ public interface HazelcastCluster { /** * Check whether current node is the leader or not. */ - void isLeader(); + void markAsMaster(); /** * returns Current Nodes in the cluster. http://git-wip-us.apache.org/repos/asf/ode/blob/71f3d35d/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java ---------------------------------------------------------------------- diff --git a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java new file mode 100644 index 0000000..c6f81ba --- /dev/null +++ b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.store; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.clustering.hazelcast.HazelcastClusterImpl; +import org.apache.ode.bpel.iapi.EndpointReferenceContext; +import org.apache.ode.il.config.OdeConfigProperties; + +import javax.sql.DataSource; +import javax.xml.namespace.QName; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Collection; + +public class ClusterProcessStoreImpl extends ProcessStoreImpl{ + private static final Log __log = LogFactory.getLog(ClusterProcessStoreImpl.class); + + private HazelcastClusterImpl _hazelcastClusterImpl; + + public ClusterProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String persistenceType, OdeConfigProperties props, boolean createDatamodel, HazelcastClusterImpl hazelcastClusterImpl) { + super(); + _hazelcastClusterImpl = hazelcastClusterImpl; + } + + public Collection deploy(final File deploymentUnitDirectory) { + Collection deployed = super.deploy(deploymentUnitDirectory); + publishProcessStoreDeployedEvent(deploymentUnitDirectory.getName()); + return deployed; + } + + public void publishProcessStoreDeployedEvent(String duName){ + String returnedDuName = _hazelcastClusterImpl.publishProcessStoreEvent("Deployed " +duName); + publishService(returnedDuName); + } + + public void publishService(final String duName) { + final ArrayList loaded = new ArrayList(); + try { + exec(new Callable() { + public Object call(ConfStoreConnection conn) { + DeploymentUnitDAO dudao = conn.getDeploymentUnit(duName); + if (dudao != null) { + loaded.addAll(load(dudao)); + } + return null; + } + }); + } catch (Exception ex) { + __log.error("Error loading DU from store: " + duName, ex); + } + + for (ProcessConfImpl p : loaded) { + try { + fireStateChange(p.getProcessId(), p.getState(), p.getDeploymentUnit().getName()); + } catch (Exception except) { + __log.error("Error while activating process: pid=" + p.getProcessId() + " package="+p.getDeploymentUnit().getName(), except); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/ode/blob/71f3d35d/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java ---------------------------------------------------------------------- diff --git a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java index b689bd1..1a99ef6 100644 --- a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java +++ b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java @@ -88,6 +88,8 @@ public class ProcessStoreImpl implements ProcessStore { protected File _configDir; + + /** * Executor used to process DB transactions. Allows us to isolate the TX context, and to ensure that only one TX gets executed a * time. We don't really care to parallelize these operations because: i) HSQL does not isolate transactions and we don't want @@ -592,7 +594,7 @@ public class ProcessStoreImpl implements ProcessStore { psl.onProcessStoreEvent(pse); } - private void fireStateChange(QName processId, ProcessState state, String duname) { + protected void fireStateChange(QName processId, ProcessState state, String duname) { switch (state) { case ACTIVE: fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.ACTVIATED, processId, duname)); http://git-wip-us.apache.org/repos/asf/ode/blob/71f3d35d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java index c387b74..0e53de4 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java @@ -36,10 +36,12 @@ public class HazelcastClusterImpl implements HazelcastCluster{ private HazelcastInstance _hazelcastInstance; private boolean isMaster = false; - private String message = ""; + private String _duName = ""; private Member leader; + private Member deployInitiator; private IMap lock_map; + private ITopic clusterMessageTopic; public HazelcastClusterImpl(HazelcastInstance hazelcastInstance) { _hazelcastInstance = hazelcastInstance; @@ -51,7 +53,7 @@ public class HazelcastClusterImpl implements HazelcastCluster{ _hazelcastInstance.getCluster().addMembershipListener(new ClusterMemberShipListener()); // Register for listening to message listener - ITopic clusterMessageTopic = _hazelcastInstance.getTopic("clusterMsg"); + clusterMessageTopic = _hazelcastInstance.getTopic("deployedMsg"); clusterMessageTopic.addMessageListener(new ClusterMessageListener()); Member localMember = _hazelcastInstance.getCluster().getLocalMember(); @@ -72,18 +74,18 @@ public class HazelcastClusterImpl implements HazelcastCluster{ public boolean lock(String key) { lock_map.lock(key); boolean state = lock_map.isLocked(key); - if (__log.isDebugEnabled()) { - __log.debug ("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state); - } + __log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state); return state; } public boolean unlock(String key) { lock_map.unlock(key); - boolean state = lock_map.isLocked(key); - if (__log.isDebugEnabled()) { - __log.debug("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after unlocking: " + state); + try { + Thread.sleep(10); + } catch (InterruptedException e) { } + boolean state = lock_map.isLocked(key); + __log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after unlocking: " + state); return state; } @@ -96,12 +98,12 @@ public class HazelcastClusterImpl implements HazelcastCluster{ @Override public void memberRemoved(MembershipEvent membershipEvent) { - isLeader(); + markAsMaster(); // Allow Leader to update distributed map. if (isMaster) { String leftMemberID = getHazelCastNodeID(membershipEvent.getMember()); - _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_NODE_MAP).remove(leftMemberID); - _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_NODE_MAP).replace(getHazelCastNodeID(leader), isMaster); + // _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_NODE_MAP).remove(leftMemberID); + // _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_NODE_MAP).replace(getHazelCastNodeID(leader), isMaster); } } @@ -114,12 +116,20 @@ public class HazelcastClusterImpl implements HazelcastCluster{ class ClusterMessageListener implements MessageListener { @Override public void onMessage(Message msg) { - message = msg.getMessageObject(); + String message = msg.getMessageObject(); + String arr[] = message.split(" ", 2); + String duName = arr[1]; + if(message.contains("Deployed ")) { + if(_hazelcastInstance.getCluster().getLocalMember() != deployInitiator) { + setDUName(duName); + __log.info("Recerive deployment msg to " +_hazelcastInstance.getCluster().getLocalMember() +"for" +duName); + } + } } } - public void isLeader() { + public void markAsMaster() { leader = _hazelcastInstance.getCluster().getMembers().iterator().next(); if (leader.localMember()) { isMaster = true; @@ -139,8 +149,14 @@ public class HazelcastClusterImpl implements HazelcastCluster{ return isMaster; } - public String getMessage() { - return message; + public void setDUName(String duName) { + _duName = duName; + } + + public String publishProcessStoreEvent(String msg) { + deployInitiator = _hazelcastInstance.getCluster().getLocalMember(); + clusterMessageTopic.publish(msg); + return _duName; } }