airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject [09/15] airavata git commit: adding support to proper acking for messages
Date Mon, 23 Mar 2015 19:38:36 GMT
adding support to proper acking for messages


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/1231c014
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/1231c014
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/1231c014

Branch: refs/heads/master
Commit: 1231c014bebd1d23c2bdd340b7d721abe279d45a
Parents: ffbb1b9
Author: Lahiru Gunathilake <glahiru@gmail.com>
Authored: Wed Feb 25 00:59:09 2015 -0500
Committer: Lahiru Gunathilake <glahiru@gmail.com>
Committed: Wed Feb 25 00:59:09 2015 -0500

----------------------------------------------------------------------
 .../airavata/api/server/AiravataAPIServer.java  |  1 +
 .../client/samples/CreateLaunchExperiment.java  | 23 +++--
 .../airavata/gfac/server/GfacServerHandler.java | 48 +++++++++--
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |  1 +
 .../core/monitor/GfacInternalStatusUpdator.java |  3 +
 .../airavata/gfac/core/utils/GFacUtils.java     | 21 +++--
 .../handlers/GridPullMonitorHandler.java        |  1 +
 .../messaging/client/RabbitMQListner.java       |  4 +-
 .../airavata/messaging/core/MessageContext.java | 17 ++++
 .../core/impl/RabbitMQTaskLaunchConsumer.java   | 10 ++-
 .../server/OrchestratorServerHandler.java       | 90 ++++++++++----------
 .../util/OrchestratorRecoveryHandler.java       |  1 +
 .../core/impl/GFACPassiveJobSubmitter.java      | 10 +--
 13 files changed, 151 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java
b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java
index 0e6da90..da42ce0 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java
@@ -299,6 +299,7 @@ public class AiravataAPIServer implements IServer, Watcher{
 
     @Override
     synchronized public void process(WatchedEvent watchedEvent) {
+        logger.info(watchedEvent.getPath());
         synchronized (mutex) {
             Event.KeeperState state = watchedEvent.getState();
             logger.info(state.name());

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index c4c303f..78c2d71 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -47,17 +47,17 @@ import java.util.*;
 public class CreateLaunchExperiment {
 
     //FIXME: Read from a config file
-//    public static final String THRIFT_SERVER_HOST = "localhost";
-//    public static final int THRIFT_SERVER_PORT = 8930;
-	public static final String THRIFT_SERVER_HOST = "gw111.iu.xsede.org";
-	public static final int THRIFT_SERVER_PORT = 9930;
+    public static final String THRIFT_SERVER_HOST = "localhost";
+    public static final int THRIFT_SERVER_PORT = 8930;
+//	public static final String THRIFT_SERVER_HOST = "gw111.iu.xsede.org";
+//	public static final int THRIFT_SERVER_PORT = 9930;
 	
     private final static Logger logger = LoggerFactory.getLogger(CreateLaunchExperiment.class);
     private static final String DEFAULT_USER = "default.registry.user";
     private static final String DEFAULT_GATEWAY = "default.registry.gateway";
     private static Airavata.Client airavataClient;
 
-    private static String echoAppId = "Echo_a8fc8511-7b8e-431a-ad0f-de5eb1a9c576";
+    private static String echoAppId = "Echo_1365a7fd-eae1-4575-b447-99afb4d79c82";
     private static String mpiAppId = "HelloMPI_720e159f-198f-4daa-96ca-9f5eafee92c9";
     private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
     private static String amberAppId = "Amber_42124128-628b-484c-829d-aff8b584eb00";
@@ -93,7 +93,7 @@ public class CreateLaunchExperiment {
 //        final String expId = createEchoExperimentForFSD(airavataClient);
         List<String> experimentIds = new ArrayList<String>();
         try {
-            for (int i = 0; i < 100; i++) {
+            for (int i = 0; i < 1; i++) {
 //                final String expId = createExperimentForSSHHost(airavata);
 //                final String expId = createEchoExperimentForFSD(airavataClient);
 //                final String expId = createMPIExperimentForFSD(airavataClient);
@@ -120,12 +120,11 @@ public class CreateLaunchExperiment {
                 launchExperiment(airavataClient, expId);
             }
 
-            Thread.sleep(10000);
-
-            for(String exId:experimentIds) {
-                Experiment experiment = airavataClient.getExperiment(exId);
-                System.out.println(experiment.getExperimentStatus().toString());
-            }
+            Thread.sleep(100);
+                for (String exId : experimentIds) {
+                    Experiment experiment = airavataClient.getExperiment(exId);
+                    System.out.println(experiment.getExperimentStatus().toString());
+                }
 
 
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 1c0f095..cca793e 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -65,8 +65,6 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
     private Registry registry;
     private AppCatalog appCatalog;
 
-    private String registryURL;
-
     private String gatewayName;
 
     private String airavataUserName;
@@ -144,12 +142,13 @@ public class GfacServerHandler implements GfacService.Iface, Watcher
{
                     CreateMode.PERSISTENT);
         }
         String instanceId = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
-        String instantNode = gfacServer + File.separator + instanceId;
-        zkStat = zk.exists(instantNode, true);
+        String instanceNode = gfacServer + File.separator + instanceId;
+        zkStat = zk.exists(instanceNode, true);
         if (zkStat == null) {
-            zk.create(instantNode,
+            zk.create(instanceNode,
                     airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                     CreateMode.EPHEMERAL);      // other component will watch these childeren
creation deletion to monitor the status of the node
+            zk.getChildren(instanceNode, true);
         }
         zkStat = zk.exists(gfacExperiments, false);
         if (zkStat == null) {
@@ -168,6 +167,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
     }
 
     synchronized public void process(WatchedEvent watchedEvent) {
+        logger.info(watchedEvent.getPath());
+        logger.info(watchedEvent.getType().toString());
         synchronized (mutex) {
             Event.KeeperState state = watchedEvent.getState();
             logger.info(state.name());
@@ -191,10 +192,39 @@ public class GfacServerHandler implements GfacService.Iface, Watcher
{
                 } catch (KeeperException e) {
                     logger.error(e.getMessage(), e);
                 }
+            } else if (Event.EventType.NodeDeleted.equals(watchedEvent.getType())) {
+                String path = watchedEvent.getPath();
+                String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,
"/gfac-experiments");
+                if (path.startsWith(experimentNode)) {
+                    // we got a watch when experiment is removed
+                    String deliveryPath = path + GFacUtils.DELIVERY_TAG_POSTFIX;
+                    try {
+                        Stat exists = zk.exists(deliveryPath, false);
+                        byte[] data = zk.getData(path + GFacUtils.DELIVERY_TAG_POSTFIX, false,
exists);
+                        long value = ByateArrayToLong(data);
+                        logger.info("ExperimentId+taskId" + path);
+                        logger.info("Sending Ack back to the Queue, because task is over");
+                        rabbitMQTaskLaunchConsumer.sendAck(value);
+                        ZKUtil.deleteRecursive(zk,deliveryPath);
+                    } catch (KeeperException e) {
+                        logger.error(e.getMessage(), e);
+                    } catch (InterruptedException e) {
+                        logger.error(e.getMessage(), e);
+                    }
+                }
             }
         }
     }
 
+    private long ByateArrayToLong(byte[] data) {
+        long value = 0;
+        for (int i = 0; i < data.length; i++)
+        {
+            value += ((long) data[i] & 0xffL) << (8 * i);
+        }
+        return value;
+    }
+
     public String getGFACServiceVersion() throws TException {
         return gfac_cpi_serviceConstants.GFAC_CPI_VERSION;
     }
@@ -314,12 +344,18 @@ public class GfacServerHandler implements GfacService.Iface, Watcher
{
                     experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,
"/gfac-experiments");
 
                     try {
-                        GFacUtils.createExperimentEntryForRPC(event.getExperimentId(), event.getTaskId(),
zk, experimentNode, nodeName, event.getTokenId());
+                        GFacUtils.createExperimentEntryForPassive(event.getExperimentId(),
event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
+                        AiravataZKUtils.getExpStatePath(event.getExperimentId(),event.getTaskId());
                         submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId());
                     } catch (KeeperException e) {
                         logger.error(nodeName + " was interrupted.");
+                        rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
                     } catch (InterruptedException e) {
                         logger.error(e.getMessage(), e);
+                        rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+                    } catch (ApplicationSettingsException e) {
+                        logger.error(e.getMessage(), e);
+                        rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
                     }
                     System.out.println(" Message Received with message id '" + message.getMessageId()
                             + "' and with message type '" + message.getType());

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index bb612a6..00930e5 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -1158,6 +1158,7 @@ public class BetterGfacImpl implements GFac,Watcher {
     }
 
     public void process(WatchedEvent watchedEvent) {
+        log.info(watchedEvent.getPath());
         if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){
             // node data is changed, this means node is cancelled.
             log.info("Experiment is cancelled with this path:"+watchedEvent.getPath());

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 7818da0..26902e7 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -29,6 +29,7 @@ import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
 import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.zookeeper.*;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -49,6 +50,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener,
Watc
         MonitorID monitorID = statusChangeRequest.getMonitorID();
         String experimentPath = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,
"/gfac-experiments") +
                 File.separator + ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)
+ File.separator + statusChangeRequest.getMonitorID().getExperimentID() + "+" + monitorID.getTaskID();
+        String deliveryTagPath = experimentPath + GFacUtils.DELIVERY_TAG_POSTFIX;
         Stat exists = null;
         try {
             if (!zk.getState().isConnected()) {
@@ -107,6 +109,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener,
Watc
     }
 
     public void process(WatchedEvent watchedEvent) {
+        logger.info(watchedEvent.getPath());
         synchronized (mutex) {
             Event.KeeperState state = watchedEvent.getState();
             if (state == Event.KeeperState.SyncConnected) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 9f104fa..c825ffd 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -60,12 +60,14 @@ import java.io.*;
 import java.net.InetAddress;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 import java.util.*;
 
 //import org.apache.airavata.commons.gfac.type.ActualParameter;
 
 public class GFacUtils {
 	private final static Logger log = LoggerFactory.getLogger(GFacUtils.class);
+	public static final String DELIVERY_TAG_POSTFIX = "-deliveryTag";
 
 	private GFacUtils() {
 	}
@@ -1156,7 +1158,7 @@ public class GFacUtils {
 	// This method is dangerous because of moving the experiment data
 	public static boolean createExperimentEntryForPassive(String experimentID,
 													  String taskID, ZooKeeper zk, String experimentNode,
-													  String pickedChild, String tokenId) throws KeeperException,
+													  String pickedChild, String tokenId,long deliveryTag) throws KeeperException,
 			InterruptedException {
 		String experimentPath = experimentNode + File.separator + pickedChild;
 		String newExpNode = experimentPath + File.separator + experimentID
@@ -1165,15 +1167,14 @@ public class GFacUtils {
 		String experimentEntry = GFacUtils.findExperimentEntry(experimentID, taskID, zk);
 		String foundExperimentPath = null;
 		if (exists1 == null && experimentEntry == null) {  // this means this is a very
new experiment
-			List<String> runningGfacNodeNames = AiravataZKUtils
-					.getAllGfacNodeNames(zk); // here we take old gfac servers
-			// too
+			List<String> runningGfacNodeNames = AiravataZKUtils.getAllGfacNodeNames(zk); //
here we take old gfac servers
+
 			for (String gfacServerNode : runningGfacNodeNames) {
 				if (!gfacServerNode.equals(pickedChild)) {
 					foundExperimentPath = experimentNode + File.separator
 							+ gfacServerNode + File.separator + experimentID
 							+ "+" + taskID;
-					exists1 = zk.exists(foundExperimentPath, false);
+					exists1 = zk.exists(foundExperimentPath, true);
 					if (exists1 != null) { // when the experiment is found we
 						// break the loop
 						break;
@@ -1183,21 +1184,23 @@ public class GFacUtils {
 			if (exists1 == null) { // OK this is a pretty new experiment so we
 				// are going to create a new node
 				log.info("This is a new Job, so creating all the experiment docs from the scratch");
+				Stat expParent = zk.exists(newExpNode, false);
 				zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
 						CreateMode.PERSISTENT);
 
-				Stat expParent = zk.exists(newExpNode, false);
 				if (tokenId != null && expParent != null) {
 					zk.setData(newExpNode, tokenId.getBytes(),
 							expParent.getVersion());
 				}
-				zk.create(newExpNode + File.separator + "state", String
+				String s = zk.create(newExpNode + File.separator + "state", String
 								.valueOf(GfacExperimentState.LAUNCHED.getValue())
 								.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
 						CreateMode.PERSISTENT);
-				zk.create(newExpNode + File.separator + "operation","submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+				String s1 = zk.create(newExpNode + File.separator + "operation", "submit".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
+						CreateMode.PERSISTENT);
+				zk.exists(s1, true);// we want to know when this node get deleted
+				String s2 = zk.create(newExpNode + DELIVERY_TAG_POSTFIX, ByteBuffer.allocate(8).putLong(deliveryTag).array(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,  // here we store the value of delivery message
 						CreateMode.PERSISTENT);
-
 			} else {
 				// ohhh this node exists in some other failed gfac folder, we
 				// have to move it to this gfac experiment list,safely

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
index e64f596..d5f9f90 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -125,6 +125,7 @@ public class GridPullMonitorHandler extends ThreadedHandler implements
Watcher{
 
 
     public void process(WatchedEvent watchedEvent) {
+        logger.info(watchedEvent.getPath());
         if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){
             // node data is changed, this means node is cancelled.
             logger.info("Experiment is cancelled with this path:"+watchedEvent.getPath());

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
----------------------------------------------------------------------
diff --git a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
index 601497a..48edbe8 100644
--- a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
+++ b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
@@ -28,7 +28,7 @@ import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.impl.RabbitMQConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
 import org.apache.airavata.model.messaging.event.*;
 import org.apache.airavata.model.workspace.experiment.ExperimentState;
 import org.apache.commons.cli.*;
@@ -67,7 +67,7 @@ public class RabbitMQListner {
             String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
             System.out.println("broker url " + brokerUrl);
             final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
-            RabbitMQConsumer consumer = new RabbitMQConsumer(brokerUrl, exchangeName);
+            RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
             consumer.listen(new MessageHandler() {
                 @Override
                 public Map<String, Object> getProperties() {

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
index 0a39d92..272f413 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
@@ -32,6 +32,7 @@ public class MessageContext {
     private final String messageId;
     private final String gatewayId;
     private Timestamp updatedTime;
+    private long deliveryTag;
 
 
     public MessageContext(TBase message, MessageType type, String messageId, String gatewayId)
{
@@ -41,6 +42,14 @@ public class MessageContext {
         this.gatewayId = gatewayId;
     }
 
+    public MessageContext(TBase event, MessageType type, String messageId, String gatewayId,
long deliveryTag) {
+        this.event = event;
+        this.type = type;
+        this.messageId = messageId;
+        this.gatewayId = gatewayId;
+        this.deliveryTag = deliveryTag;
+    }
+
     public TBase getEvent() {
         return event;
     }
@@ -64,4 +73,12 @@ public class MessageContext {
     public String getGatewayId() {
         return gatewayId;
     }
+
+    public long getDeliveryTag() {
+        return deliveryTag;
+    }
+
+    public void setDeliveryTag(long deliveryTag) {
+        this.deliveryTag = deliveryTag;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
index 1c7b0e8..7c88a25 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
@@ -165,7 +165,7 @@ public class RabbitMQTaskLaunchConsumer {
                             event = taskTerminateEvent;
                             gatewayId = null;
                         }
-                        MessageContext messageContext = new MessageContext(event, message.getMessageType(),
message.getMessageId(), gatewayId);
+                        MessageContext messageContext = new MessageContext(event, message.getMessageType(),
message.getMessageId(), gatewayId,deliveryTag);
                         messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
                         handler.onMessage(messageContext);
                         try {
@@ -241,4 +241,12 @@ public class RabbitMQTaskLaunchConsumer {
             }
         }
     }
+
+    public void sendAck(long deliveryTag){
+        try {
+            channel.basicAck(deliveryTag,false); //todo move this logic to monitoring component
to ack when the job is done
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index b200468..f430bc9 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -292,43 +292,45 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 	 * This method gracefully handler gfac node failures
 	 */
 	synchronized public void process(WatchedEvent watchedEvent) {
+		log.info(watchedEvent.getPath());
 		synchronized (mutex) {
 			try {
 				Event.KeeperState state = watchedEvent.getState();
 				switch (state) {
-				case SyncConnected:
-					mutex.notify();
-					break;
-                case Expired:case Disconnected:
-                        try {
-                            zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this);
-                            synchronized (mutex) {
-                                mutex.wait(); // waiting for the syncConnected event
-                            }
-                            String airavataServerHostPort = ServerSettings
-                                    .getSetting(Constants.ORCHESTRATOR_SERVER_HOST)
-                                    + ":"
-                                    + ServerSettings
-                                    .getSetting(Constants.ORCHESTRATOR_SERVER_PORT);
-                            String OrchServer = ServerSettings
-                                    .getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE);
-                            registerOrchestratorService(airavataServerHostPort, OrchServer);
-                        } catch (IOException e) {
-                            e.printStackTrace();
-                        } catch (ApplicationSettingsException e) {
-                            e.printStackTrace();
-                        } catch (InterruptedException e) {
-                            e.printStackTrace();
-                        } catch (KeeperException e) {
-                            e.printStackTrace();
-                        }
-                        break;
-                }
+					case SyncConnected:
+						mutex.notify();
+						break;
+					case Expired:
+					case Disconnected:
+						try {
+							zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this);
+							synchronized (mutex) {
+								mutex.wait(); // waiting for the syncConnected event
+							}
+							String airavataServerHostPort = ServerSettings
+									.getSetting(Constants.ORCHESTRATOR_SERVER_HOST)
+									+ ":"
+									+ ServerSettings
+									.getSetting(Constants.ORCHESTRATOR_SERVER_PORT);
+							String OrchServer = ServerSettings
+									.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE);
+							registerOrchestratorService(airavataServerHostPort, OrchServer);
+						} catch (IOException e) {
+							e.printStackTrace();
+						} catch (ApplicationSettingsException e) {
+							e.printStackTrace();
+						} catch (InterruptedException e) {
+							e.printStackTrace();
+						} catch (KeeperException e) {
+							e.printStackTrace();
+						}
+						break;
+				}
 				if (watchedEvent.getPath() != null
 						&& watchedEvent.getPath().startsWith(
-								ServerSettings.getSetting(
-										Constants.ZOOKEEPER_GFAC_SERVER_NODE,
-										"/gfac-server"))) {
+						ServerSettings.getSetting(
+								Constants.ZOOKEEPER_GFAC_SERVER_NODE,
+								"/gfac-server"))) {
 					List<String> children = zk.getChildren(ServerSettings
 							.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE,
 									"/gfac-server"), true);
@@ -340,18 +342,18 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 										+ File.separator + gfacNodes, this);
 					}
 					switch (watchedEvent.getType()) {
-					case NodeCreated:
-						mutex.notify();
-						break;
-					case NodeDeleted:
-						// here we have to handle gfac node shutdown case
-						if (children.size() == 0) {
-							log.error("There are not gfac instances to route failed jobs");
-							return;
-						}
-						// we recover one gfac node at a time
-						final WatchedEvent event = watchedEvent;
-						final OrchestratorServerHandler handler = this;
+						case NodeCreated:
+							mutex.notify();
+							break;
+						case NodeDeleted:
+							// here we have to handle gfac node shutdown case
+							if (children.size() == 0) {
+								log.error("There are not gfac instances to route failed jobs");
+								return;
+							}
+							// we recover one gfac node at a time
+							final WatchedEvent event = watchedEvent;
+							final OrchestratorServerHandler handler = this;
 						/*(new Thread() {  // disabling ft implementation with zk
 							public void run() {
 								int retry = 0;
@@ -372,7 +374,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 
 							}
 						}).start();*/
-						break;
+							break;
 					}
 
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
index fb3bd51..f19b949 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
@@ -95,6 +95,7 @@ public class OrchestratorRecoveryHandler implements Watcher {
     }
 
     synchronized public void process(WatchedEvent watchedEvent) {
+        log.info(watchedEvent.getPath());
         synchronized (mutex) {
             Event.KeeperState state = watchedEvent.getState();
             switch (state) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index b5e25b1..8066113 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -187,11 +187,9 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher
{
                 String[] split = gfacNodeData.split(":");
                 if (zk.exists(gfacServer + File.separator + pickedChild, false) != null)
{
                     // before submitting the job we check again the state of the node
-                    if (GFacUtils.createExperimentEntryForRPC(experimentID, taskID, zk, experimentNode,
pickedChild, null)) {
-                        TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID,
taskID, null,null);
-                        MessageContext messageContext = new MessageContext(taskSubmitEvent,
MessageType.LAUNCHTASK,"LAUNCH.TERMINATE-"+ UUID.randomUUID().toString(),null);
-                        publisher.publish(messageContext);
-                    }
+                    TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID,
null, null);
+                    MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.TERMINATETASK,
"LAUNCH.TERMINATE-" + UUID.randomUUID().toString(), null);
+                    publisher.publish(messageContext);
                 }
             }
         } catch (InterruptedException e) {
@@ -217,6 +215,8 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
     }
 
     synchronized public void process(WatchedEvent event) {
+        logger.info(getClass().getName() + event.getPath());
+        logger.info(getClass().getName()+event.getType());
         synchronized (mutex) {
             switch (event.getState()) {
                 case SyncConnected:


Mime
View raw message