airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [1/2] airavata git commit: Experiment cancel request, Orchestrator side implementation and refactored zookeeper node paths. AIRAVATA-1798
Date Thu, 03 Sep 2015 23:57:36 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 420a031a1 -> be9af52e2


Experiment cancel request, Orchestrator side implementation and refactored zookeeper node
paths. AIRAVATA-1798


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e0483f37
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e0483f37
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e0483f37

Branch: refs/heads/master
Commit: e0483f37fc301d459212736d9b57adc3169666ba
Parents: 6623967
Author: Shameera Rathanyaka <shameerainfo@gmail.com>
Authored: Thu Sep 3 19:57:17 2015 -0400
Committer: Shameera Rathanyaka <shameerainfo@gmail.com>
Committed: Thu Sep 3 19:57:17 2015 -0400

----------------------------------------------------------------------
 .../airavata/common/utils/zkConstants.java      |  32 ++++
 .../airavata/gfac/core/GFacConstants.java       |   8 +-
 .../apache/airavata/gfac/core/GFacUtils.java    |  15 +-
 .../impl/watcher/CancelRequestWatcherImpl.java  |   8 +-
 .../watcher/RedeliveryRequestWatcherImpl.java   |   2 +
 .../airavata/gfac/server/GfacServerHandler.java |  67 +++++---
 .../server/OrchestratorServerHandler.java       | 151 ++++---------------
 7 files changed, 122 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java
b/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java
new file mode 100644
index 0000000..9255e02
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.common.utils;
+
+public interface ZkConstants {
+
+	public static final String ZOOKEEPER_SERVERS_NODE = "/servers";
+	public static final String ZOOKEEPER_GFAC_SERVER_NODE = "/gfac";
+	public static final String ZOOKEEPER_EXPERIMENT_NODE = "/experiments";
+	public static final String ZOOKEEPER_DELIVERYTAG_NODE = "/deliveryTag";
+	public static final String ZOOKEEPER_TOKEN_NODE = "/token";
+	public static final String ZOOKEEPER_CANCEL_LISTENER_NODE = "/cancelListener";
+	public static final String ZOOKEEPER_CANCEL_REQEUST = "CANCEL_REQUEST";
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
index b662fff..444956b 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
@@ -50,13 +50,7 @@ public class GFacConstants {
 	public static final String _127_0_0_1 = "127.0.0.1";
 	public static final String LOCALHOST = "localhost";
 
-	public static final String ZOOKEEPER_SERVERS_NODE = "/servers";
-	public static final String ZOOKEEPER_GFAC_SERVER_NODE = "/gfac";
-	public static final String ZOOKEEPER_EXPERIMENT_NODE = "/experiments";
-	public static final String ZOOKEEPER_DELIVERYTAG_NODE = "/deliveryTag";
-	public static final String ZOOKEEPER_TOKEN_NODE = "/token";
-	public static final String ZOOKEEPER_CANCEL_LISTENER_NODE = "/cancelListener";
-	public static final String ZOOKEEPER_CANCEL_REQEUST = "CANCEL_REQUEST";
+
 
 	public static final String PROP_WORKFLOW_INSTANCE_ID = "workflow.instance.id";
 	public static final String PROP_WORKFLOW_NODE_ID = "workflow.node.id";

http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index 784d214..1fca2d0 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -25,6 +25,7 @@ import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.DBUtil;
 import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
 import org.apache.airavata.gfac.core.context.ProcessContext;
@@ -551,7 +552,7 @@ public class GFacUtils {
      * @throws InterruptedException
      */
     public static String findExperimentEntry(String experimentID, CuratorFramework curatorClient)
throws Exception {
-        String experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE;
+        String experimentNode = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE;
         List<String> children = curatorClient.getChildren().forPath(experimentNode);
         for (String pickedChild : children) {
             String experimentPath = experimentNode + File.separator + pickedChild;
@@ -568,9 +569,9 @@ public class GFacUtils {
 
     public static boolean setExperimentCancelRequest(String processId, CuratorFramework curatorClient,
long
 		    deliveryTag) throws Exception {
-	    String experimentNode = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
-	    String cancelListenerNodePath = ZKPaths.makePath(experimentNode, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
-	    curatorClient.setData().withVersion(-1).forPath(cancelListenerNodePath, GFacConstants.ZOOKEEPER_CANCEL_REQEUST
+	    String experimentNode = ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
+	    String cancelListenerNodePath = ZKPaths.makePath(experimentNode, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+	    curatorClient.setData().withVersion(-1).forPath(cancelListenerNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST
 			    .getBytes());
 	    return true;
     }
@@ -768,7 +769,7 @@ public class GFacUtils {
 //    }
 
     public static String getZKGfacServersParentPath() {
-        return ZKPaths.makePath(GFacConstants.ZOOKEEPER_SERVERS_NODE, GFacConstants.ZOOKEEPER_GFAC_SERVER_NODE);
+        return ZKPaths.makePath(ZkConstants.ZOOKEEPER_SERVERS_NODE, ZkConstants.ZOOKEEPER_GFAC_SERVER_NODE);
     }
 
     public static JobDescriptor createJobDescriptor(ProcessContext processContext) throws
GFacException, AppCatalogException, ApplicationSettingsException {
@@ -1113,11 +1114,11 @@ public class GFacUtils {
     }
 
 	public static String getExperimentNodePath(String experimentId) {
-		return GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator + experimentId;
+		return ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator + experimentId;
 	}
 
 	public static long getProcessDeliveryTag(CuratorFramework curatorClient, String processId)
throws Exception {
-		String deliveryTagPath = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + processId + GFacConstants
+		String deliveryTagPath = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + processId + ZkConstants
 				.ZOOKEEPER_DELIVERYTAG_NODE;
 		byte[] bytes = curatorClient.getData().forPath(deliveryTagPath);
 		return GFacUtils.bytesToLong(bytes);

http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
index bfeac89..58d2817 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
@@ -20,6 +20,7 @@
  */
 package org.apache.airavata.gfac.impl.watcher;
 
+import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
 import org.apache.airavata.gfac.impl.Factory;
@@ -44,7 +45,7 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher {
 				byte[] bytes = curatorClient.getData().forPath(path);
 				String processId = path.substring(path.lastIndexOf("/") + 1);
 				String action = new String(bytes);
-				if (action.equalsIgnoreCase("CANCEL")) {
+				if (action.equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST)) {
 					ProcessContext processContext = Factory.getGfacContext().getProcess(processId);
 					if (processContext != null) {
 						processContext.setCancel(true);
@@ -56,9 +57,12 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher {
 					curatorClient.getData().usingWatcher(this).forPath(path);
 				}
 				break;
+			case NodeDeleted:
+				//end of experiment execution, ignore this event
+				break;
 			case NodeCreated:
 			case NodeChildrenChanged:
-			case NodeDeleted:
+			case None:
 				curatorClient.getData().usingWatcher(this).forPath(path);
 				break;
 			default:

http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
index 4738edb..dc5317f 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
@@ -58,6 +58,8 @@ public class RedeliveryRequestWatcherImpl implements RedeliveryRequestWatcher
{
 				}
 				break;
 			case NodeDeleted:
+				//end of experiment execution, ignore this event
+				break;
 			case NodeCreated:
 			case NodeChildrenChanged:
 			case None:

http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 9ebfa05..1040b05 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -26,6 +26,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
 import org.apache.airavata.gfac.core.GFacConstants;
 import org.apache.airavata.gfac.core.GFacException;
@@ -54,6 +55,7 @@ import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -116,7 +118,7 @@ public class GfacServerHandler implements GfacService.Iface {
         airavataServerHostPort = ServerSettings.getGfacServerHost() + ":" + ServerSettings.getGFacServerPort();
         // create PERSISTENT nodes
         ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacUtils.getZKGfacServersParentPath());
-        ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacConstants.ZOOKEEPER_EXPERIMENT_NODE);
+        ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), ZkConstants.ZOOKEEPER_EXPERIMENT_NODE);
         // create EPHEMERAL server name node
         String gfacName = ServerSettings.getGFacServerName();
         if (curatorClient.checkExists().forPath(ZKPaths.makePath(GFacUtils.getZKGfacServersParentPath()
,gfacName)) == null) {
@@ -196,7 +198,7 @@ public class GfacServerHandler implements GfacService.Iface {
         private String gfacServerName;
 
         public ProcessLaunchMessageHandler() throws ApplicationSettingsException {
-            experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE;
+            experimentNode = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE;
             gfacServerName = ServerSettings.getGFacServerName();
         }
 
@@ -226,8 +228,7 @@ public class GfacServerHandler implements GfacService.Iface {
 		                if (Factory.getGfacContext().getProcess(event.getProcessId()) != null)
{
 			                // update deliver tag
 			                try {
-				                updateDeliveryTag(curatorClient, gfacServerName, event.getProcessId(),
message
-						                .getDeliveryTag());
+				                updateDeliveryTag(curatorClient, gfacServerName, event, message );
 				                return;
 			                } catch (Exception e) {
 				                log.error("Error while updating delivery tag for redelivery message ,
messageId : " +
@@ -254,8 +255,7 @@ public class GfacServerHandler implements GfacService.Iface {
 			                .getProcessId());
 	                publishProcessStatus(event, status);
                     try {
-	                    createProcessZKNode(curatorClient, gfacServerName, event.getProcessId(),
message
-			                    .getDeliveryTag(), event.getTokenId());
+	                    createProcessZKNode(curatorClient, gfacServerName, event, message);
 	                    submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId());
                     } catch (Exception e) {
                         log.error(e.getMessage(), e);
@@ -306,38 +306,55 @@ public class GfacServerHandler implements GfacService.Iface {
 		statusPublisher.publish(msgCtx);
 	}
 
-	private void createProcessZKNode(CuratorFramework curatorClient, String gfacServerName,
String
-			processId, long deliveryTag, String token) throws Exception {
-		// TODO - To handle multiple processes per experiment, need to create a /experiments/{expId}/{processId}
node
-		// create /experiments/{processId} node and set data - serverName, add redelivery listener
-		String zkProcessNodePath = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
+	private void createProcessZKNode(CuratorFramework curatorClient, String gfacServerName,ProcessSubmitEvent
event
+			,MessageContext messageContext) throws Exception {
+		String processId  = event.getProcessId();
+		String token = event.getTokenId();
+		String experimentId = event.getExperimentId();
+		long deliveryTag = messageContext.getDeliveryTag();
+
+		// create /experiments//{experimentId}{processId} node and set data - serverName, add redelivery
listener
+		String experimentNodePath = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + experimentId;
+		String zkProcessNodePath = ZKPaths.makePath(experimentNodePath, processId);
 		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkProcessNodePath);
 		curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, gfacServerName.getBytes());
 		curatorClient.getData().usingWatcher(Factory.getRedeliveryReqeustWatcher()).forPath(zkProcessNodePath);
 
-		// create /experiments/{processId}/deliveryTag node and set data - deliveryTag
-		String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE);
+		// create /experiments/{experimentId}/{processId}/deliveryTag node and set data - deliveryTag
+		String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE);
 		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), deliveryTagPath);
 		curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag));
 
-		// create /experiments/{processId}/token node and set data - token
-		String tokenNodePath = ZKPaths.makePath(processId, GFacConstants.ZOOKEEPER_TOKEN_NODE);
+		// create /experiments/{experimentId}/{processId}/token node and set data - token
+		String tokenNodePath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_TOKEN_NODE);
 		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), tokenNodePath);
 		curatorClient.setData().withVersion(-1).forPath(tokenNodePath, token.getBytes());
 
-		// create /experiments/{processId}/cancelListener node and set watcher for data changes
-		String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+		// create /experiments/{experimentId}/{processId}/cancelListener node and set watcher for
data changes
+/*		String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
 		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), cancelListenerNode);
-		curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath(cancelListenerNode);
+		curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath(cancelListenerNode);*/
+
+		// create /experiments/{experimentId}/cancel node and set watcher for data changes
+		String experimentCancelNode = experimentNodePath + "/" + ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE;
+		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentCancelNode);
+		curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath (experimentCancelNode);
+
 	}
 
-	private void updateDeliveryTag(CuratorFramework curatorClient, String gfacServerName, String
processId, long
-			deliveryTag) throws Exception {
-		// create /experiments/{processId} node and set data - serverName, add redelivery listener
-		String zkProcessNodePath = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
-		// create /experiments/{processId}/deliveryTag node and set data - deliveryTag
-		String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE);
-		curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag));
+	private void updateDeliveryTag(CuratorFramework curatorClient, String gfacServerName, ProcessSubmitEvent
event,
+	                               MessageContext messageContext) throws Exception {
+		String experimentId = event.getExperimentId();
+		String processId = event.getProcessId();
+		long deliveryTag = messageContext.getDeliveryTag();
+		String processNodePath = ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE,
+				experimentId), processId);
+		Stat stat = curatorClient.checkExists().forPath(processNodePath);
+		if (stat != null) {
+			// create /experiments/{processId}/deliveryTag node and set data - deliveryTag
+			String deliveryTagPath = ZKPaths.makePath(processNodePath, ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE);
+			curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag));
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e0483f37/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 8427d0c..acb5530 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -26,6 +26,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.scheduler.HostScheduler;
@@ -59,8 +60,14 @@ import org.apache.airavata.registry.core.app.catalog.resources.AppCatAbstractRes
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.core.experiment.catalog.resources.AbstractExpCatResource;
 import org.apache.airavata.registry.cpi.*;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,6 +87,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface
{
 	private String gatewayName;
 	private Publisher publisher;
 	private RabbitMQStatusConsumer statusConsumer;
+	private CuratorFramework curatorClient;
 
     /**
 	 * Query orchestrator server to fetch the CPI version
@@ -109,6 +117,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface
{
 			String exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
 			statusConsumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
 			statusConsumer.listen(new ProcessStatusHandler());
+			startCurator();
 		} catch (OrchestratorException | RegistryException | AppCatalogException | AiravataException
e) {
 			log.error(e.getMessage(), e);
 			throw new OrchestratorException("Error while initializing orchestrator service", e);
@@ -209,7 +218,12 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface
{
 	 */
 	public boolean terminateExperiment(String experimentId, String tokenId) throws TException
{
         log.info(experimentId, "Experiment: {} is cancelling  !!!!!", experimentId);
-        return validateStatesAndCancel(experimentId, tokenId);
+		try {
+			return validateStatesAndCancel(experimentId, tokenId);
+		} catch (Exception e) {
+			log.error("expId : " + experimentId + " :- Error while cancelling experiment", e);
+			return false;
+		}
 	}
 
 	private String getAiravataUserName() {
@@ -277,7 +291,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface
{
         List<ComputeResourceDescription> computeHostList = Arrays.asList(deploymentMap.keySet().toArray(new
ComputeResourceDescription[]{}));
         Class<? extends HostScheduler> aClass = Class.forName(
                 ServerSettings.getHostScheduler()).asSubclass(
-                HostScheduler.class);
+		        HostScheduler.class);
         HostScheduler hostScheduler = aClass.newInstance();
         ComputeResourceDescription ComputeResourceDescription = hostScheduler.schedule(computeHostList);
         return deploymentMap.get(ComputeResourceDescription);
@@ -297,124 +311,15 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface
{
 		return selectedModuleId;
 	}
 
-    private boolean validateStatesAndCancel(String experimentId, String tokenId)throws TException{
-        // FIXME
-//        try {
-//            Experiment experiment = (Experiment) experimentCatalog.get(
-//                    ExperimentCatalogModelType.EXPERIMENT, experimentId);
-//			log.info("Waiting for zookeeper to connect to the server");
-//			synchronized (mutex){
-//				mutex.wait(5000);
-//			}
-//            if (experiment == null) {
-//                log.errorId(experimentId, "Error retrieving the Experiment by the given
experimentID: {}.", experimentId);
-//                throw new OrchestratorException("Error retrieving the Experiment by the
given experimentID: " + experimentId);
-//            }
-//            ExperimentState experimentState = experiment.getExperimentStatus().getExperimentState();
-//            if (isCancelValid(experimentState)){
-//                ExperimentStatus status = new ExperimentStatus();
-//                status.setExperimentState(ExperimentState.CANCELING);
-//                status.setTimeOfStateChange(Calendar.getInstance()
-//                        .getTimeInMillis());
-//                experiment.setExperimentStatus(status);
-//                experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment,
-//                        experimentId);
-//
-//                List<String> ids = experimentCatalog.getIds(
-//                        ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL,
-//                        WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
-//                for (String workflowNodeId : ids) {
-//                    WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) experimentCatalog
-//                            .get(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL,
-//                                    workflowNodeId);
-//                    int value = workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().getValue();
-//                    if ( value> 1 && value < 7) { // we skip the unknown
state
-//                        log.error(workflowNodeDetail.getNodeName() + " Workflow Node status
cannot mark as cancelled, because " +
-//                                "current status is " + workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().toString());
-//                        continue; // this continue is very useful not to process deeper
loops if the upper layers have non-cancel states
-//                    } else {
-//                        WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus();
-//                        workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELING);
-//                        workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
-//                                .getTimeInMillis());
-//                        workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
-//                        experimentCatalog.update(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL,
workflowNodeDetail,
-//                                workflowNodeId);
-//                    }
-//                    List<Object> taskDetailList = experimentCatalog.get(
-//                            ExperimentCatalogModelType.TASK_DETAIL,
-//                            TaskDetailConstants.NODE_ID, workflowNodeId);
-//                    for (Object o : taskDetailList) {
-//                        TaskDetails taskDetails = (TaskDetails) o;
-//                        TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus();
-//                        if (taskStatus.getExecutionState().getValue() > 7 &&
taskStatus.getExecutionState().getValue()<12) {
-//                            log.error(((TaskDetails) o).getTaskID() + " Task status cannot
mark as cancelled, because " +
-//                                    "current task state is " + ((TaskDetails) o).getTaskStatus().getExecutionState().toString());
-//                            continue;// this continue is very useful not to process deeper
loops if the upper layers have non-cancel states
-//                        } else {
-//                            taskStatus.setExecutionState(TaskState.CANCELING);
-//                            taskStatus.setTimeOfStateChange(Calendar.getInstance()
-//                                    .getTimeInMillis());
-//                            taskDetails.setTaskStatus(taskStatus);
-//                            experimentCatalog.update(ExperimentCatalogModelType.TASK_DETAIL,
o,
-//                                    taskDetails.getTaskID());
-//                        }
-//                        orchestrator.cancelExperiment(experiment,
-//                                workflowNodeDetail, taskDetails, tokenId);
-//                        // Status update should be done at the monitor
-//                    }
-//                }
-//            }else {
-//                if (isCancelAllowed(experimentState)){
-//                    // when experiment status is < 3 no jobDetails object is created,
-//                    // so we don't have to worry, we simply have to change the status and
stop the execution
-//                    ExperimentStatus status = new ExperimentStatus();
-//                    status.setExperimentState(ExperimentState.CANCELED);
-//                    status.setTimeOfStateChange(Calendar.getInstance()
-//                            .getTimeInMillis());
-//                    experiment.setExperimentStatus(status);
-//                    experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment,
-//                            experimentId);
-//                    List<String> ids = experimentCatalog.getIds(
-//                            ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL,
-//                            WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
-//                    for (String workflowNodeId : ids) {
-//                        WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails)
experimentCatalog
-//                                .get(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL,
-//                                        workflowNodeId);
-//                        WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus();
-//                        workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED);
-//                        workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
-//                                .getTimeInMillis());
-//                        workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
-//                        experimentCatalog.update(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL,
workflowNodeDetail,
-//                                workflowNodeId);
-//                        List<Object> taskDetailList = experimentCatalog.get(
-//                                ExperimentCatalogModelType.TASK_DETAIL,
-//                                TaskDetailConstants.NODE_ID, workflowNodeId);
-//                        for (Object o : taskDetailList) {
-//                            TaskDetails taskDetails = (TaskDetails) o;
-//                            TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus();
-//                            taskStatus.setExecutionState(TaskState.CANCELED);
-//                            taskStatus.setTimeOfStateChange(Calendar.getInstance()
-//                                    .getTimeInMillis());
-//                            taskDetails.setTaskStatus(taskStatus);
-//                            experimentCatalog.update(ExperimentCatalogModelType.TASK_DETAIL,
o,
-//                                    taskDetails);
-//                        }
-//                    }
-//                }else {
-//                    log.errorId(experimentId, "Unable to mark experiment as Cancelled,
current state {} doesn't allow to cancel the experiment {}.",
-//                            experiment.getExperimentStatus().getExperimentState().toString(),
experimentId);
-//                    throw new OrchestratorException("Unable to mark experiment as Cancelled,
because current state is: "
-//                            + experiment.getExperimentStatus().getExperimentState().toString());
-//                }
-//            }
-//            log.info("Experiment: " + experimentId + " is cancelled !!!!!");
-//        } catch (Exception e) {
-//            throw new TException(e);
-//        }
-        return true;
+    private boolean validateStatesAndCancel(String experimentId, String tokenId) throws Exception
{
+	    String expCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE,
+			    experimentId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+	    Stat stat = curatorClient.checkExists().forPath(expCancelNodePath);
+	    if (stat != null) {
+		    curatorClient.setData().withVersion(-1).forPath(expCancelNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST
+				    .getBytes());
+	    }
+	    return true;
     }
 
     private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken)
throws TException {
@@ -427,6 +332,12 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface
{
 //        }
     }
 
+	private void startCurator() throws ApplicationSettingsException {
+		String connectionSting = ServerSettings.getZookeeperConnection();
+		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
+		curatorClient = CuratorFrameworkFactory.newClient(connectionSting, retryPolicy);
+		curatorClient.start();
+	}
     private class SingleAppExperimentRunner implements Runnable {
 
         String experimentId;


Mime
View raw message