airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject airavata git commit: fixing gfac status updating with worker queuest
Date Wed, 01 Apr 2015 16:38:36 GMT
Repository: airavata
Updated Branches:
  refs/heads/master ac9dc30f5 -> 764a23933


fixing gfac status updating with worker queuest


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/764a2393
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/764a2393
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/764a2393

Branch: refs/heads/master
Commit: 764a23933225075b7a36146d284e9d238d3d280a
Parents: ac9dc30
Author: Lahiru Gunathilake <glahiru@gmail.com>
Authored: Wed Apr 1 12:38:32 2015 -0400
Committer: Lahiru Gunathilake <glahiru@gmail.com>
Committed: Wed Apr 1 12:38:32 2015 -0400

----------------------------------------------------------------------
 .../AiravataExperimentStatusUpdator.java        | 43 +++++++++--
 .../client/samples/CreateLaunchExperiment.java  |  6 +-
 .../airavata/common/utils/AiravataZKUtils.java  | 31 +++++++-
 .../airavata/gfac/server/GfacServerHandler.java |  2 +-
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |  4 +-
 .../AiravataWorkflowNodeStatusUpdator.java      |  2 +
 .../core/monitor/GfacInternalStatusUpdator.java | 17 -----
 .../airavata/gfac/core/utils/GFacUtils.java     | 75 +++++++++++---------
 .../handlers/GridPullMonitorHandler.java        |  2 +-
 .../util/OrchestratorRecoveryHandler.java       |  2 +-
 10 files changed, 118 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/764a2393/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index 1efa506..aabba55 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@ -22,12 +22,13 @@ package org.apache.airavata.api.server.listener;
 
 import com.google.common.eventbus.Subscribe;
 import org.apache.airavata.api.server.util.DataModelUtils;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.MonitorPublisher;
-import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.*;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
 import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
 import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
@@ -36,18 +37,29 @@ import org.apache.airavata.model.workspace.experiment.Experiment;
 import org.apache.airavata.model.workspace.experiment.ExperimentState;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.util.Calendar;
 
 public class AiravataExperimentStatusUpdator implements AbstractActivityListener {
     private final static Logger logger = LoggerFactory.getLogger(AiravataExperimentStatusUpdator.class);
 
     private Registry airavataRegistry;
+
     private MonitorPublisher monitorPublisher;
+
     private Publisher publisher;
 
+    private ZooKeeper zk;
+
+    private RabbitMQTaskLaunchConsumer consumer;
+
+
     public Registry getAiravataRegistry() {
         return airavataRegistry;
     }
@@ -61,7 +73,9 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
 		try {
 			boolean updateExperimentStatus=true;
 			ExecutionType executionType = DataModelUtils.getExecutionType((Experiment) airavataRegistry.get(RegistryModelType.EXPERIMENT,
nodeStatus.getWorkflowNodeIdentity().getExperimentId()));
-			
+            String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,
"/gfac-experiments");
+            String experimentPath = experimentNode + File.separator + ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)
+                    + File.separator + nodeStatus.getWorkflowNodeIdentity().getExperimentId();
 	        ExperimentState state = ExperimentState.UNKNOWN;
 	        switch (nodeStatus.getState()) {
 	            case CANCELED:
@@ -73,19 +87,23 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
 	            	}else{
 	                state = ExperimentState.EXECUTING; updateExperimentStatus = true;
 	                }
+
+                    cleanup(nodeStatus, experimentNode, experimentPath);
 	                break;
 	            case INVOKED:
 	                state = ExperimentState.LAUNCHED; updateExperimentStatus = false;
 	                break;
 	            case FAILED:
 	                state = ExperimentState.FAILED; updateExperimentStatus = true;
+                    cleanup(nodeStatus,experimentNode,experimentPath);
 	                break;
 	            case EXECUTING:
 	                state = ExperimentState.EXECUTING; updateExperimentStatus = true;
 	                break;
 	            case CANCELING:
 	                state = ExperimentState.CANCELING; updateExperimentStatus = true;
-	                break;
+                    cleanup(nodeStatus,experimentNode,experimentPath);
+                    break;
 	            default:
 	                return;
 	        }
@@ -109,7 +127,15 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
             throw new Exception("Error persisting experiment status..", e);
 		}
     }
-    
+
+    private void cleanup(WorkflowNodeStatusChangeEvent nodeStatus, String experimentNode,
String experimentPath) throws ApplicationSettingsException, KeeperException, InterruptedException,
AiravataException {
+        if (ServerSettings.isGFacPassiveMode()) {
+            consumer.sendAck(AiravataZKUtils.getDeliveryTag(nodeStatus.getWorkflowNodeIdentity().getExperimentId(),
zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)));
+        }
+        ZKUtil.deleteRecursive(zk, experimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX);
+        ZKUtil.deleteRecursive(zk, experimentPath);
+    }
+
     public  ExperimentState updateExperimentStatus(String experimentId, ExperimentState state)
throws Exception {
     	Experiment details = (Experiment)airavataRegistry.get(RegistryModelType.EXPERIMENT,
experimentId);
         if(details == null) {
@@ -140,7 +166,12 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
 				this.monitorPublisher=(MonitorPublisher) configuration;
 			} else if (configuration instanceof Publisher){
                 this.publisher=(Publisher) configuration;
+            }else if (configuration instanceof RabbitMQTaskLaunchConsumer) {
+                this.consumer = (RabbitMQTaskLaunchConsumer) configuration;
+            }else if (configuration instanceof ZooKeeper) {
+                this.zk = (ZooKeeper) configuration;
             }
+
         }
 	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/764a2393/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 4274fbe..0652fbc 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -58,7 +58,7 @@ public class CreateLaunchExperiment {
     private static final String DEFAULT_GATEWAY = "php_reference_gateway";
     private static Airavata.Client airavataClient;
 
-    private static String echoAppId = "Echo_5dd52cd4-f9a0-459f-9baf-f8e715e44548";
+    private static String echoAppId = "Echo_11afd5ec-e04e-45c9-843b-0c5b28a617f8";
     private static String mpiAppId = "HelloMPI_bfd56d58-6085-4b7f-89fc-646576830518";
     private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
     private static String amberAppId = "Amber_aa083c86-4680-4002-b3ef-fad93c181926";
@@ -165,10 +165,10 @@ public class CreateLaunchExperiment {
             for (int i = 0; i < 1; i++) {
 //                final String expId = createExperimentForSSHHost(airavata);
 //                final String expId = createEchoExperimentForFSD(airavataClient);
-                final String expId = createMPIExperimentForFSD(airavataClient);
+//                final String expId = createMPIExperimentForFSD(airavataClient);
 //               final String expId = createEchoExperimentForStampede(airavataClient);
 //                final String expId = createEchoExperimentForTrestles(airavataClient);
-//                final String expId = createExperimentEchoForLocalHost(airavataClient);
+                final String expId = createExperimentEchoForLocalHost(airavataClient);
                 experimentIds.add(expId);
 //                final String expId = createExperimentWRFTrestles(airavataClient);
 //                final String expId = createExperimentForBR2(airavataClient);

http://git-wip-us.apache.org/repos/asf/airavata/blob/764a2393/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
index a0cc142..11c03fb 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
@@ -20,6 +20,7 @@
 */
 package org.apache.airavata.common.utils;
 
+import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -45,18 +46,20 @@ public class AiravataZKUtils {
 
     public static final String ZK_EXPERIMENT_STATE_NODE = "state";
 
+    public static final String DELIVERY_TAG_POSTFIX = "-deliveryTag";
+
     public static String getExpZnodePath(String experimentId, String taskId) throws ApplicationSettingsException
{
         return ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE) +
                 File.separator +
                 ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME) + File.separator
-                + experimentId + "+" + taskId;
+                + experimentId;
     }
 
     public static String getExpZnodeHandlerPath(String experimentId, String taskId, String
className) throws ApplicationSettingsException {
         return ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE) +
                 File.separator +
                 ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME) + File.separator
-                + experimentId + "+" + taskId + File.separator + className;
+                + experimentId + File.separator + className;
     }
 
     public static String getZKhostPort() throws ApplicationSettingsException {
@@ -196,6 +199,30 @@ public class AiravataZKUtils {
         return bytes;
     }
 
+    public static long getDeliveryTag(String experimentID, ZooKeeper zk, String experimentNode,
+                                      String pickedChild) throws KeeperException, InterruptedException,AiravataException
{
+        String experimentPath = experimentNode + File.separator + pickedChild;
+        String deliveryTagPath = experimentPath + File.separator + experimentID
+                + DELIVERY_TAG_POSTFIX;
+        Stat exists = zk.exists(deliveryTagPath, false);
+        if(exists==null) {
+            throw new AiravataException("Cannot find delivery Tag for this experiment");
+        }
+        return bytesToLong(zk.getData(deliveryTagPath, false, exists));
+    }
+    public static byte[] longToBytes(long x) {
+        ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.putLong(x);
+        return buffer.array();
+    }
+
+    public static long bytesToLong(byte[] bytes) {
+        ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.put(bytes);
+        buffer.flip();//need flip
+        return buffer.getLong();
+    }
+
     public static double toDouble(byte[] bytes) {
         return ByteBuffer.wrap(bytes).getDouble();
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/764a2393/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index cf83aaa..f2c5075 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -239,7 +239,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
     public boolean submitJob(String experimentId, String taskId, String gatewayId) throws
TException {
         requestCount++;
         logger.info("-----------------------------------------------------" + requestCount+"-----------------------------------------------------");
-        logger.infoId(experimentId, "GFac Received submit jog request for the Experiment:
{} TaskId: {}", experimentId, taskId);
+        logger.infoId(experimentId, "GFac Received submit job request for the Experiment:
{} TaskId: {}", experimentId, taskId);
         GFac gfac = getGfac();
         InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, experimentId,
taskId, gatewayId);
 //        try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/764a2393/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 3cd07c6..8d09a09 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -478,7 +478,7 @@ public class BetterGfacImpl implements GFac,Watcher {
         // We need to check whether this job is submitted as a part of a large workflow.
If yes,
         // we need to setup workflow tracking listerner.
         try {
-            String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(),
jobExecutionContext.getTaskData().getTaskID(), zk);
+            String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(),
zk);
             Stat exists = zk.exists(experimentEntry + File.separator + "operation", false);
             zk.getData(experimentEntry + File.separator + "operation", this, exists);
             int stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
  // this is the original state came, if we query again it might be different,so we preserve
this state in the environment
@@ -534,7 +534,7 @@ public class BetterGfacImpl implements GFac,Watcher {
         // we need to setup workflow tracking listener.
         try {
             // we cannot call GFacUtils.getZKExperimentStateValue because experiment might
be running in some other node
-            String expPath = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(),
jobExecutionContext.getTaskData().getTaskID(), zk);
+            String expPath = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(),
zk);
             int stateVal = GFacUtils.getZKExperimentStateValue(zk, expPath);   // this is
the original state came, if we query again it might be different,so we preserve this state
in the environment
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                     , GfacExperimentState.ACCEPTED));                  // immediately we
get the request we update the status

http://git-wip-us.apache.org/repos/asf/airavata/blob/764a2393/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
index 8a93c40..cf90987 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
@@ -49,6 +49,8 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
     private Publisher publisher;
 
 
+
+
     public Registry getAiravataRegistry() {
         return airavataRegistry;
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/764a2393/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 6c456b0..d03237e 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -47,8 +47,6 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener,
Watc
 
     private static Integer mutex = -1;
 
-    private RabbitMQTaskLaunchConsumer consumer;
-
     @Subscribe
     public void updateZK(GfacExperimentStateChangeRequest statusChangeRequest) throws Exception
{
         logger.info("Gfac internal state changed to: " + statusChangeRequest.getState().toString());
@@ -94,22 +92,10 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener,
Watc
             case COMPLETED:
                 logger.info("Experiment Completed, So removing the ZK entry for the experiment"
+ monitorID.getExperimentID());
                 logger.info("Zookeeper experiment Path: " + experimentPath);
-                if (ServerSettings.isGFacPassiveMode()) {
-                    consumer.sendAck(GFacUtils.getDeliveryTag(statusChangeRequest.getMonitorID().getExperimentID(),
-                            monitorID.getTaskID(), zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)));
-                }
-                ZKUtil.deleteRecursive(zk,experimentPath+GFacUtils.DELIVERY_TAG_POSTFIX);
-                ZKUtil.deleteRecursive(zk, experimentPath);
                 break;
             case FAILED:
                 logger.info("Experiment Failed, So removing the ZK entry for the experiment"
+ monitorID.getExperimentID());
                 logger.info("Zookeeper experiment Path: " + experimentPath);
-                if (ServerSettings.isGFacPassiveMode()) {
-                    consumer.sendAck(GFacUtils.getDeliveryTag(statusChangeRequest.getMonitorID().getExperimentID(),
-                            monitorID.getTaskID(), zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)));
-                }
-                ZKUtil.deleteRecursive(zk,experimentPath+GFacUtils.DELIVERY_TAG_POSTFIX);
-                ZKUtil.deleteRecursive(zk, experimentPath);
                 break;
             default:
         }
@@ -120,9 +106,6 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener,
Watc
             if (configuration instanceof ZooKeeper) {
                 this.zk = (ZooKeeper) configuration;
             }
-            if (configuration instanceof RabbitMQTaskLaunchConsumer) {
-                this.consumer = (RabbitMQTaskLaunchConsumer) configuration;
-            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/764a2393/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index a0fc2cb..80bc37f 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -20,6 +20,7 @@
  */
 package org.apache.airavata.gfac.core.utils;
 
+import edu.uiuc.ncsa.security.delegation.services.Server;
 import org.airavata.appcatalog.cpi.AppCatalog;
 import org.airavata.appcatalog.cpi.AppCatalogException;
 import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
@@ -67,7 +68,6 @@ import java.util.*;
 
 public class GFacUtils {
 	private final static Logger log = LoggerFactory.getLogger(GFacUtils.class);
-	public static final String DELIVERY_TAG_POSTFIX = "-deliveryTag";
 
 	private GFacUtils() {
 	}
@@ -1051,10 +1051,9 @@ public class GFacUtils {
 													  String pickedChild, String tokenId) throws KeeperException,
 			InterruptedException {
 		String experimentPath = experimentNode + File.separator + pickedChild;
-		String newExpNode = experimentPath + File.separator + experimentID
-				+ "+" + taskID;
+		String newExpNode = experimentPath + File.separator + experimentID;
         Stat exists1 = zk.exists(newExpNode, false);
-        String experimentEntry = GFacUtils.findExperimentEntry(experimentID, taskID, zk);
+        String experimentEntry = GFacUtils.findExperimentEntry(experimentID, zk);
         String foundExperimentPath = null;
 		if (exists1 == null && experimentEntry == null) {  // this means this is a very
new experiment
 			List<String> runningGfacNodeNames = AiravataZKUtils
@@ -1063,8 +1062,7 @@ public class GFacUtils {
 			for (String gfacServerNode : runningGfacNodeNames) {
 				if (!gfacServerNode.equals(pickedChild)) {
 					foundExperimentPath = experimentNode + File.separator
-							+ gfacServerNode + File.separator + experimentID
-							+ "+" + taskID;
+							+ gfacServerNode + File.separator + experimentID;
 					exists1 = zk.exists(foundExperimentPath, false);
 					if (exists1 != null) { // when the experiment is found we
 											// break the loop
@@ -1131,7 +1129,7 @@ public class GFacUtils {
 				log.info("Deleting experiment data: " + foundExperimentPath);
 				ZKUtil.deleteRecursive(zk, foundExperimentPath);
 			}
-		}else if(experimentEntry != null && GFacUtils.isCancelled(experimentID,taskID,zk)
){
+		}else if(experimentEntry != null && GFacUtils.isCancelled(experimentID,zk) ){
             // this happens when a cancel request comes to a differnt gfac node, in this
case we do not move gfac experiment
             // node to gfac node specific location, because original request execution will
fail with errors
             log.error("This experiment is already cancelled and its already executing the
cancel operation so cannot submit again !");
@@ -1145,8 +1143,7 @@ public class GFacUtils {
             for (String gfacServerNode : runningGfacNodeNames) {
                 if (!gfacServerNode.equals(pickedChild)) {
                     foundExperimentPath = experimentNode + File.separator
-                            + gfacServerNode + File.separator + experimentID
-                            + "+" + taskID;
+                            + gfacServerNode + File.separator + experimentID;
                     break;
                 }
             }
@@ -1161,10 +1158,9 @@ public class GFacUtils {
 														  String pickedChild, String tokenId, long deliveryTag) throws KeeperException,
 			InterruptedException, ApplicationSettingsException {
 		String experimentPath = experimentNode + File.separator + pickedChild;
-		String newExpNode = experimentPath + File.separator + experimentID
-				+ "+" + taskID;
+		String newExpNode = experimentPath + File.separator + experimentID;
 		Stat exists1 = zk.exists(newExpNode, false);
-		String experimentEntry = GFacUtils.findExperimentEntry(experimentID, taskID, zk);
+		String experimentEntry = GFacUtils.findExperimentEntry(experimentID, zk);
 		if (exists1 != null) {
 			log.error("This request is wrong because its already running in the same instance");
 			return false;
@@ -1186,7 +1182,7 @@ public class GFacUtils {
 			String s1 = zk.create(newExpNode + File.separator + "operation", "submit".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
 					CreateMode.PERSISTENT);
 			zk.exists(s1, true);// we want to know when this node get deleted
-			String s2 = zk.create(newExpNode + DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag), ZooDefs.Ids.OPEN_ACL_UNSAFE,
 // here we store the value of delivery message
+			String s2 = zk.create(newExpNode + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag),
ZooDefs.Ids.OPEN_ACL_UNSAFE,  // here we store the value of delivery message
 					CreateMode.PERSISTENT);
 		} else {
 			log.error("ExperimentID: " + experimentID + " taskID: " + taskID
@@ -1237,16 +1233,14 @@ public class GFacUtils {
 	 * @throws KeeperException
 	 * @throws InterruptedException
 	 */
-    public static String findExperimentEntry(String experimentID,
-                                                String taskID, ZooKeeper zk
+    public static String findExperimentEntry(String experimentID, ZooKeeper zk
                                                 ) throws KeeperException,
             InterruptedException {
         String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,
"/gfac-experiments");
         List<String> children = zk.getChildren(experimentNode, false);
         for(String pickedChild:children) {
             String experimentPath = experimentNode + File.separator + pickedChild;
-            String newExpNode = experimentPath + File.separator + experimentID
-                    + "+" + taskID;
+            String newExpNode = experimentPath + File.separator + experimentID;
             Stat exists = zk.exists(newExpNode, false);
             if(exists == null){
                 continue;
@@ -1257,9 +1251,36 @@ public class GFacUtils {
         return null;
     }
 
+	/**
+	 * This will return a value if the server is down because we iterate through exisiting experiment
nodes, not
+	 * through gfac-server nodes
+	 * @param experimentID
+	 * @param zk
+	 * @return
+	 * @throws KeeperException
+	 * @throws InterruptedException
+	 */
+	public static String findExperimentEntryPassive(String experimentID, ZooKeeper zk
+	) throws KeeperException,
+			InterruptedException {
+		String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,
"/gfac-experiments");
+		List<String> children = zk.getChildren(experimentNode, false);
+		for(String pickedChild:children) {
+			String experimentPath = experimentNode + File.separator + pickedChild;
+			String newExpNode = experimentPath + File.separator + experimentID;
+			Stat exists = zk.exists(newExpNode, false);
+			if(exists == null){
+				continue;
+			}else{
+				return newExpNode;
+			}
+		}
+		return null;
+	}
+
     public static void setExperimentCancel(String experimentId,String taskId,ZooKeeper zk)throws
KeeperException,
             InterruptedException {
-        String experimentEntry = GFacUtils.findExperimentEntry(experimentId, taskId, zk);
+        String experimentEntry = GFacUtils.findExperimentEntry(experimentId, zk);
         if(experimentEntry == null){
             log.error("Cannot find the experiment Entry, so cancel operation cannot be performed
!!!");
         }else {
@@ -1273,11 +1294,11 @@ public class GFacUtils {
         }
 
     }
-    public static boolean isCancelled(String experimentID,
-                                             String taskID, ZooKeeper zk
+    public static boolean isCancelled(String experimentID, ZooKeeper zk
     ) throws KeeperException,
             InterruptedException {
-        String experimentEntry = GFacUtils.findExperimentEntry(experimentID, taskID, zk);
+		String experimentEntry = GFacUtils.findExperimentEntry(experimentID, zk);
+
         if(experimentEntry == null){
             return false;
         }else {
@@ -1311,18 +1332,6 @@ public class GFacUtils {
 		}
 	}
 
-	public static long getDeliveryTag(String experimentID,
-									  String taskID, ZooKeeper zk, String experimentNode,
-									  String pickedChild) throws KeeperException, InterruptedException,GFacException
{
-		String experimentPath = experimentNode + File.separator + pickedChild;
-		String deliveryTagPath = experimentPath + File.separator + experimentID
-				+ "+" + taskID + DELIVERY_TAG_POSTFIX;
-		Stat exists = zk.exists(deliveryTagPath, false);
-		if(exists==null) {
-			throw new GFacException("Cannot find delivery Tag for this experiment");
-		}
-		return bytesToLong(zk.getData(deliveryTagPath, false, exists));
-	}
 	public static String getPluginData(JobExecutionContext jobExecutionContext,
 			String className) throws ApplicationSettingsException,
 			KeeperException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/764a2393/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
index d5f9f90..24b300e 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -89,7 +89,7 @@ public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{
         try {
             ZooKeeper zk = jobExecutionContext.getZk();
             try {
-                String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(),
jobExecutionContext.getTaskData().getTaskID(), zk);
+                String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(),
zk);
                 String path = experimentEntry + File.separator + "operation";
                 Stat exists = zk.exists(path, this);
                 if (exists != null) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/764a2393/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
index 993a303..9005f70 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
@@ -81,7 +81,7 @@ public class OrchestratorRecoveryHandler implements Watcher {
             log.info("Recovering Experiment: " + expId.split("\\+")[0]);
             log.info("------------------------------------------------------------------------------------");
             try {
-                if(GFacUtils.isCancelled(expId.split("\\+")[0], expId.split("\\+")[1], zk))
{// during relaunching we check the operation and then launch
+                if(GFacUtils.isCancelled(expId.split("\\+")[0], zk)) {// during relaunching
we check the operation and then launch
                     serverHandler.terminateExperiment(expId.split("\\+")[0]);
                 }else {
                     serverHandler.launchExperiment(expId.split("\\+")[0], null);


Mime
View raw message