ode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sath...@apache.org
Subject [04/30] ode git commit: first phase of implementing using locking mechanism
Date Fri, 06 Nov 2015 10:50:59 GMT
first phase of implementing using locking mechanism


Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/71f3d35d
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/71f3d35d
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/71f3d35d

Branch: refs/heads/ODE-563
Commit: 71f3d35d7ca766ddfeca8e0884495b9dfc9b7f04
Parents: 764bf64
Author: suba <suba.11@cse.mrt.ac.lk>
Authored: Fri Jun 12 01:24:27 2015 +0530
Committer: suba <suba.11@cse.mrt.ac.lk>
Committed: Fri Jun 12 01:24:27 2015 +0530

----------------------------------------------------------------------
 Rakefile                                        |  2 +-
 .../java/org/apache/ode/axis2/ODEServer.java    |  9 ++-
 .../ode/axis2/deploy/DeploymentPoller.java      |  2 +
 .../ode/axis2/service/DeploymentWebService.java |  2 +
 .../apache/ode/bpel/hzapi/HazelcastCluster.java |  2 +-
 .../ode/store/ClusterProcessStoreImpl.java      | 80 ++++++++++++++++++++
 .../org/apache/ode/store/ProcessStoreImpl.java  |  4 +-
 .../hazelcast/HazelcastClusterImpl.java         | 46 +++++++----
 8 files changed, 126 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode/blob/71f3d35d/Rakefile
----------------------------------------------------------------------
diff --git a/Rakefile b/Rakefile
index 1320043..5fa4a07 100644
--- a/Rakefile
+++ b/Rakefile
@@ -270,7 +270,7 @@ define "ode" do
   desc "ODE Process Store"
   define "bpel-store" do
     compile.with projects("bpel-api", "bpel-compiler", "bpel-dao", "bpel-obj", "bpel-schemas",
"bpel-epr",
-      "dao-hibernate", "dao-jpa", "utils"),
+      "dao-hibernate", "dao-jpa", "clustering", "utils"),
       JAVAX.persistence, JAVAX.stream, JAVAX.transaction, HIBERNATE, HSQLDB, XMLBEANS, XERCES,
WSDL4J, OPENJPA, SPRING, SLF4J, LOG4J
     compile { open_jpa_enhance }
     resources hibernate_doclet(:package=>"org.apache.ode.store.hib", :excludedtags=>"@version,@author,@todo")

http://git-wip-us.apache.org/repos/asf/ode/blob/71f3d35d/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
index 1547042..d1f6c36 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
@@ -79,6 +79,7 @@ import org.apache.ode.il.dbutil.Database;
 import org.apache.ode.scheduler.simple.JdbcDelegate;
 import org.apache.ode.scheduler.simple.SimpleScheduler;
 import org.apache.ode.store.ProcessStoreImpl;
+import org.apache.ode.store.ClusterProcessStoreImpl;
 import org.apache.ode.utils.GUID;
 import org.apache.ode.utils.fs.TempFileManager;
 
@@ -526,13 +527,15 @@ public class ODEServer {
         _store.registerListener(new ProcessStoreListenerImpl());
         _store.setDeployDir(
                 _odeConfig.getDeployDir() != null ?
-                    new File(_odeConfig.getDeployDir()) :
-                    new File(_workRoot, "processes"));
+                        new File(_odeConfig.getDeployDir()) :
+                        new File(_workRoot, "processes"));
         _store.setConfigDir(_configRoot);
     }
 
     protected ProcessStoreImpl createProcessStore(EndpointReferenceContext eprContext, DataSource
ds) {
-        return new ProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(),
_odeConfig, false);
+        if (isClusteringEnabled)
+            return new ClusterProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(),
_odeConfig, false, hazelcastClusterImpl);
+        else return new ProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(),
_odeConfig, false);
     }
 
     protected Scheduler createScheduler() {

http://git-wip-us.apache.org/repos/asf/ode/blob/71f3d35d/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
index 25d7f20..66890ba 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
@@ -140,6 +140,7 @@ public class DeploymentPoller {
         // Checking for new deployment directories
         if (isDeploymentFromODEFileSystemAllowed() && files != null) {
             for (File file : files) {
+                __log.info("Trying to access the lock for " +file.getName());
                 duLocked = lock(file.getName());
                 try {
                     if (duLocked) {
@@ -186,6 +187,7 @@ public class DeploymentPoller {
                         }
                     }
                 } finally {
+                    __log.info("Trying to release the lock for " + file.getName());
                     unlock(file.getName());
                 }
             }

http://git-wip-us.apache.org/repos/asf/ode/blob/71f3d35d/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
index 1c09bb3..bd35167 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
@@ -167,6 +167,7 @@ public class DeploymentWebService {
                         _poller.hold();
 
                         File dest = new File(_deployPath, bundleName + "-" + _store.getCurrentVersion());
+                        __log.info("Trying to access the lock for " + dest.getName());
 
                         //lock on deployment unit directory name
                         duLocked = _poller.lock(dest.getName());
@@ -212,6 +213,7 @@ public class DeploymentWebService {
                                 }
                                 sendResponse(factory, messageContext, "deployResponse", response);
                             } finally {
+                                __log.info("Trying to release the lock for " + dest.getName());
                                 _poller.unlock(dest.getName());
                             }
                         }

http://git-wip-us.apache.org/repos/asf/ode/blob/71f3d35d/bpel-api/src/main/java/org/apache/ode/bpel/hzapi/HazelcastCluster.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/hzapi/HazelcastCluster.java b/bpel-api/src/main/java/org/apache/ode/bpel/hzapi/HazelcastCluster.java
index 4e03c7d..adca32c 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/hzapi/HazelcastCluster.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/hzapi/HazelcastCluster.java
@@ -39,7 +39,7 @@ public interface HazelcastCluster {
     /**
      * Check whether current node is the leader or not.
      */
-    void isLeader();
+     void markAsMaster();
 
     /**
      * returns Current Nodes in the cluster.

http://git-wip-us.apache.org/repos/asf/ode/blob/71f3d35d/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
----------------------------------------------------------------------
diff --git a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
new file mode 100644
index 0000000..c6f81ba
--- /dev/null
+++ b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.store;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.clustering.hazelcast.HazelcastClusterImpl;
+import org.apache.ode.bpel.iapi.EndpointReferenceContext;
+import org.apache.ode.il.config.OdeConfigProperties;
+
+import javax.sql.DataSource;
+import javax.xml.namespace.QName;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Collection;
+
+public class ClusterProcessStoreImpl extends ProcessStoreImpl{
+    private static final Log __log = LogFactory.getLog(ClusterProcessStoreImpl.class);
+
+    private HazelcastClusterImpl _hazelcastClusterImpl;
+
+    public ClusterProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String
persistenceType, OdeConfigProperties props, boolean createDatamodel, HazelcastClusterImpl
hazelcastClusterImpl) {
+        super();
+        _hazelcastClusterImpl = hazelcastClusterImpl;
+    }
+
+    public Collection<QName> deploy(final File deploymentUnitDirectory) {
+        Collection<QName> deployed = super.deploy(deploymentUnitDirectory);
+        publishProcessStoreDeployedEvent(deploymentUnitDirectory.getName());
+        return deployed;
+    }
+
+    public void publishProcessStoreDeployedEvent(String duName){
+       String returnedDuName = _hazelcastClusterImpl.publishProcessStoreEvent("Deployed "
+duName);
+       publishService(returnedDuName);
+    }
+
+    public void publishService(final String duName) {
+        final ArrayList<ProcessConfImpl> loaded = new ArrayList<ProcessConfImpl>();
+        try {
+            exec(new Callable<Object>() {
+                public Object call(ConfStoreConnection conn) {
+                    DeploymentUnitDAO dudao = conn.getDeploymentUnit(duName);
+                    if (dudao != null) {
+                        loaded.addAll(load(dudao));
+                    }
+                    return null;
+                }
+            });
+        } catch (Exception ex) {
+            __log.error("Error loading DU from store: " + duName, ex);
+        }
+
+        for (ProcessConfImpl p : loaded) {
+            try {
+                fireStateChange(p.getProcessId(), p.getState(), p.getDeploymentUnit().getName());
+            } catch (Exception except) {
+                __log.error("Error while activating process: pid=" + p.getProcessId() + "
package="+p.getDeploymentUnit().getName(), except);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ode/blob/71f3d35d/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
----------------------------------------------------------------------
diff --git a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
index b689bd1..1a99ef6 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
@@ -88,6 +88,8 @@ public class ProcessStoreImpl implements ProcessStore {
 
     protected File _configDir;
 
+
+
     /**
      * Executor used to process DB transactions. Allows us to isolate the TX context, and
to ensure that only one TX gets executed a
      * time. We don't really care to parallelize these operations because: i) HSQL does not
isolate transactions and we don't want
@@ -592,7 +594,7 @@ public class ProcessStoreImpl implements ProcessStore {
             psl.onProcessStoreEvent(pse);
     }
 
-    private void fireStateChange(QName processId, ProcessState state, String duname) {
+    protected void fireStateChange(QName processId, ProcessState state, String duname) {
         switch (state) {
             case ACTIVE:
                 fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.ACTVIATED, processId,
duname));

http://git-wip-us.apache.org/repos/asf/ode/blob/71f3d35d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
index c387b74..0e53de4 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
@@ -36,10 +36,12 @@ public class HazelcastClusterImpl implements HazelcastCluster{
 
     private HazelcastInstance _hazelcastInstance;
     private boolean isMaster = false;
-    private String message = "";
+    private String _duName = "";
     private Member leader;
+    private Member deployInitiator;
 
     private IMap<String, String> lock_map;
+    private ITopic<String> clusterMessageTopic;
 
     public HazelcastClusterImpl(HazelcastInstance hazelcastInstance) {
         _hazelcastInstance = hazelcastInstance;
@@ -51,7 +53,7 @@ public class HazelcastClusterImpl implements HazelcastCluster{
         _hazelcastInstance.getCluster().addMembershipListener(new ClusterMemberShipListener());
 
         // Register for listening to message listener
-        ITopic<String> clusterMessageTopic = _hazelcastInstance.getTopic("clusterMsg");
+        clusterMessageTopic = _hazelcastInstance.getTopic("deployedMsg");
         clusterMessageTopic.addMessageListener(new ClusterMessageListener());
 
         Member localMember = _hazelcastInstance.getCluster().getLocalMember();
@@ -72,18 +74,18 @@ public class HazelcastClusterImpl implements HazelcastCluster{
     public boolean lock(String key) {
         lock_map.lock(key);
         boolean state = lock_map.isLocked(key);
-        if (__log.isDebugEnabled()) {
-        __log.debug ("ThreadID:" + Thread.currentThread().getId() + " duLocked value for
" + key + " file" + " after locking: " + state);
-        }
+        __log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for "
+ key + " file" + " after locking: " + state);
         return state;
     }
 
     public boolean unlock(String key) {
         lock_map.unlock(key);
-        boolean state = lock_map.isLocked(key);
-        if (__log.isDebugEnabled()) {
-        __log.debug("ThreadID:" + Thread.currentThread().getId() + " duLocked value for "
+ key + " file" + " after unlocking: " + state);
+        try {
+            Thread.sleep(10);
+        } catch (InterruptedException e) {
         }
+        boolean state = lock_map.isLocked(key);
+        __log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for "
+ key + " file" + " after unlocking: " + state);
         return state;
     }
 
@@ -96,12 +98,12 @@ public class HazelcastClusterImpl implements HazelcastCluster{
 
         @Override
         public void memberRemoved(MembershipEvent membershipEvent) {
-            isLeader();
+            markAsMaster();
             // Allow Leader to update distributed map.
             if (isMaster) {
                 String leftMemberID = getHazelCastNodeID(membershipEvent.getMember());
-                _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_NODE_MAP).remove(leftMemberID);
-                _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_NODE_MAP).replace(getHazelCastNodeID(leader),
isMaster);
+               // _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_NODE_MAP).remove(leftMemberID);
+               // _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_NODE_MAP).replace(getHazelCastNodeID(leader),
isMaster);
             }
         }
 
@@ -114,12 +116,20 @@ public class HazelcastClusterImpl implements HazelcastCluster{
     class ClusterMessageListener implements MessageListener<String> {
         @Override
         public void onMessage(Message<String> msg) {
-            message = msg.getMessageObject();
+            String message = msg.getMessageObject();
+            String arr[] = message.split(" ", 2);
+            String duName = arr[1];
+            if(message.contains("Deployed ")) {
+                if(_hazelcastInstance.getCluster().getLocalMember() != deployInitiator) {
+                    setDUName(duName);
+                    __log.info("Recerive deployment msg to " +_hazelcastInstance.getCluster().getLocalMember()
+"for" +duName);
+                }
+            }
         }
     }
 
 
-    public void isLeader() {
+    public void markAsMaster() {
         leader = _hazelcastInstance.getCluster().getMembers().iterator().next();
         if (leader.localMember()) {
             isMaster = true;
@@ -139,8 +149,14 @@ public class HazelcastClusterImpl implements HazelcastCluster{
         return isMaster;
     }
 
-    public String getMessage() {
-        return message;
+    public void setDUName(String duName) {
+        _duName = duName;
+    }
+
+    public String publishProcessStoreEvent(String msg) {
+        deployInitiator = _hazelcastInstance.getCluster().getLocalMember();
+        clusterMessageTopic.publish(msg);
+        return _duName;
     }
 
 }


Mime
View raw message