airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [1/2] airavata git commit: Refactor Cancel job request and remove RPC job submitter class.
Date Tue, 12 May 2015 23:27:21 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 4f4c79963 -> 34cd927c3


Refactor Cancel job request and remove RPC job submitter class.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d2108729
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d2108729
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d2108729

Branch: refs/heads/master
Commit: d21087291855c68c692ca53601baa28629f1e7c0
Parents: 1b84883
Author: shamrath <shameerainfo@gmail.com>
Authored: Tue May 12 19:27:04 2015 -0400
Committer: shamrath <shameerainfo@gmail.com>
Committed: Tue May 12 19:27:04 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/server/GfacServerHandler.java |   2 +-
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |  62 ++----
 .../airavata/gfac/core/utils/GFacUtils.java     | 206 ++++--------------
 .../gfac/monitor/email/EmailBasedMonitor.java   |   6 +-
 .../gfac/ssh/provider/impl/SSHProvider.java     |  10 +-
 .../server/OrchestratorServerHandler.java       |   4 +-
 .../core/impl/GFACRPCJobSubmitter.java          | 212 -------------------
 7 files changed, 73 insertions(+), 429 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/d2108729/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index b90c731..f944d91 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -403,7 +403,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
                     TBase messageEvent = message.getEvent();
                     byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
                     ThriftUtils.createThriftFromBytes(bytes, event);
-                    GFacUtils.setExperimentCancel(event.getExperimentId(), event.getTaskId(),
zk);
+                    GFacUtils.setExperimentCancel(event.getExperimentId(), event.getTaskId(),
zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
                     AiravataZKUtils.getExpStatePath(event.getExperimentId());
                     cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(),
event.getTokenId());
                     System.out.println(" Message Received with message id '" + message.getMessageId()

http://git-wip-us.apache.org/repos/asf/airavata/blob/d2108729/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 82798d1..32317f3 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -220,8 +220,16 @@ public class BetterGfacImpl implements GFac,Watcher {
             StringWriter errors = new StringWriter();
             e.printStackTrace(new PrintWriter(errors));
             GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT,
ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            // FIXME: Here we need to update Experiment status to Failed, as we used chained
update approach updating
+            // task status will cause to update Experiment status. Remove this chained update
approach and fix this correctly (update experiment status)
             if(jobExecutionContext!=null){
                 monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),
GfacExperimentState.FAILED));
+                TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                        jobExecutionContext.getExperimentID(),
+                        jobExecutionContext.getGatewayID());
+                TaskStatusChangeRequestEvent event = new TaskStatusChangeRequestEvent(TaskState.FAILED,
taskIdentity);
+                monitorPublisher.publish(event);
             }
             throw new GFacException(e);
         }finally {
@@ -565,10 +573,6 @@ public class BetterGfacImpl implements GFac,Watcher {
 
     private boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException
{
         try {
-            // we cannot call GFacUtils.getZKExperimentStateValue because experiment might
be running in some other node
-            String expPath = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(),
zk);
-            Stat exists = zk.exists(expPath + File.separator + "operation", false);
-            zk.getData(expPath + File.separator + "operation", this, exists);
             GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(zk, jobExecutionContext);
  // this is the original state came, if we query again it might be different,so we preserve
this state in the environment
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                     , GfacExperimentState.ACCEPTED));                  // immediately we
get the request we update the status
@@ -578,42 +582,14 @@ public class BetterGfacImpl implements GFac,Watcher {
             }
             // Register log event listener. This is required in all scenarios.
             jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
-            if (isNewJob(gfacExpState)) {
-                log.info("Job is not yet submitted, so nothing much to do except changing
the registry entry " +
-                        " and stop the execution chain");
-            } else if (isCompletedJob(gfacExpState)) {
-                log.error("This experiment is almost finished, so cannot cancel this experiment");
-                ZKUtil.deleteRecursive(zk,
-                        AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()));
-            } else {
+            if (gfacExpState == GfacExperimentState.PROVIDERINVOKING) { // we already have
changed registry status, we need to handle job canceling scenario.
                 log.info("Job is in a position to perform a proper cancellation");
                 try {
                     Scheduler.schedule(jobExecutionContext);
                     invokeProviderCancel(jobExecutionContext);
-                } catch (Exception e) {
-                    try {
-                        // we make the experiment as failed due to exception scenario
-                        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new
MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
-                        JobStatusChangeRequestEvent changeRequestEvent = new JobStatusChangeRequestEvent();
-                        changeRequestEvent.setState(JobState.FAILED);
-                        JobIdentifier jobIdentifier = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(),
-                                jobExecutionContext.getTaskData().getTaskID(),
-                                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                                jobExecutionContext.getExperimentID(),
-                                jobExecutionContext.getGatewayID());
-                        changeRequestEvent.setJobIdentity(jobIdentifier);
-                        monitorPublisher.publish(changeRequestEvent);
-                    } catch (NullPointerException e1) {
-                        log.error("Error occured during updating the statuses of Experiments,tasks
or Job statuses to failed, "
-                                + "NullPointerException occurred because at this point there
might not have Job Created", e1, e);
-                        // Updating the task status if there's any task associated
-                        monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED,
-                                new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
-                                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                                        jobExecutionContext.getExperimentID(),
-                                        jobExecutionContext.getGatewayID())));
-
-                    }
+                } catch (GFacException e) {
+                    // we make the experiment as failed due to exception scenario
+                    monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),
GfacExperimentState.FAILED));
                     jobExecutionContext.setProperty(ERROR_SENT, "true");
                     jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
                     throw new GFacException(e.getMessage(), e);
@@ -794,7 +770,7 @@ public class BetterGfacImpl implements GFac,Watcher {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),
GfacExperimentState.PROVIDERINVOKING));
-            GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName());
+            GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName());
             initProvider(provider, jobExecutionContext);
             executeProvider(provider, jobExecutionContext);
             disposeProvider(provider, jobExecutionContext);
@@ -811,7 +787,7 @@ public class BetterGfacImpl implements GFac,Watcher {
         if (provider != null) {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),
GfacExperimentState.PROVIDERINVOKING));
             GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext,
provider.getClass().getName());
-            GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName());
+            GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName());
             if (plState != null && plState == GfacHandlerState.INVOKING) {    //
this will make sure if a plugin crashes it will not launch from the scratch, but plugins have
to save their invoked state
                 initProvider(provider, jobExecutionContext);
                 executeProvider(provider, jobExecutionContext);
@@ -831,14 +807,12 @@ public class BetterGfacImpl implements GFac,Watcher {
 
     }
 
-    private void invokeProviderCancel(JobExecutionContext jobExecutionContext) throws GFacException,
ApplicationSettingsException, InterruptedException, KeeperException {
+    private void invokeProviderCancel(JobExecutionContext jobExecutionContext) throws GFacException
{
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),
GfacExperimentState.PROVIDERINVOKING));
             initProvider(provider, jobExecutionContext);
             cancelProvider(provider, jobExecutionContext);
             disposeProvider(provider, jobExecutionContext);
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),
GfacExperimentState.PROVIDERINVOKED));
         }
         if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
             invokeOutFlowHandlers(jobExecutionContext);
@@ -851,7 +825,7 @@ public class BetterGfacImpl implements GFac,Watcher {
         if (provider != null) {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),
GfacExperimentState.PROVIDERINVOKING));
             GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext,
provider.getClass().getName());
-            GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName());
+            GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName());
             if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING)
{    // this will make sure if a plugin crashes it will not launch from the scratch, but plugins
have to save their invoked state
                 initProvider(provider, jobExecutionContext);
                 cancelProvider(provider, jobExecutionContext);
@@ -923,7 +897,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                     Class<? extends GFacHandler> handlerClass;
                     GFacHandler handler;
                     try {
-                        GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName());
+                        GFacUtils.createHandlerZnode(zk, jobExecutionContext, handlerClassName.getClassName());
                         handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
                         handler = handlerClass.newInstance();
                         handler.initProperties(handlerClassName.getProperties());
@@ -1001,7 +975,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                         Class<? extends GFacHandler> handlerClass;
                         GFacHandler handler;
                         try {
-                            GFacUtils.createPluginZnode(jobExecutionContext.getZk(), jobExecutionContext,
handlerClassName.getClassName());
+                            GFacUtils.createHandlerZnode(jobExecutionContext.getZk(), jobExecutionContext,
handlerClassName.getClassName());
                             handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
                             handler = handlerClass.newInstance();
                             handler.initProperties(handlerClassName.getProperties());

http://git-wip-us.apache.org/repos/asf/airavata/blob/d2108729/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 9861cdc..4cd850d 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -930,8 +930,8 @@ public class GFacUtils {
         return -1;
     }
 
-	public static boolean createPluginZnode(ZooKeeper zk,
-			JobExecutionContext jobExecutionContext, String className)
+	public static boolean createHandlerZnode(ZooKeeper zk,
+                                             JobExecutionContext jobExecutionContext, String
className)
 			throws ApplicationSettingsException, KeeperException,
 			InterruptedException {
 		String expState = AiravataZKUtils.getExpZnodeHandlerPath(
@@ -1048,129 +1048,22 @@ public class GFacUtils {
 	}
 
 	// This method is dangerous because of moving the experiment data
-	public static boolean createExperimentEntryForRPC(String experimentID,
-													  String taskID, ZooKeeper zk, String experimentNode,
-													  String pickedChild, String tokenId) throws KeeperException,
-			InterruptedException {
-		String experimentPath = experimentNode + File.separator + pickedChild;
-		String newExpNode = experimentPath + File.separator + experimentID;
-        Stat exists1 = zk.exists(newExpNode, false);
-        String experimentEntry = GFacUtils.findExperimentEntry(experimentID, zk);
-        String foundExperimentPath = null;
-		if (exists1 == null && experimentEntry == null) {  // this means this is a very
new experiment
-			List<String> runningGfacNodeNames = AiravataZKUtils
-					.getAllGfacNodeNames(zk); // here we take old gfac servers
-												// too
-			for (String gfacServerNode : runningGfacNodeNames) {
-				if (!gfacServerNode.equals(pickedChild)) {
-					foundExperimentPath = experimentNode + File.separator
-							+ gfacServerNode + File.separator + experimentID;
-					exists1 = zk.exists(foundExperimentPath, false);
-					if (exists1 != null) { // when the experiment is found we
-											// break the loop
-						break;
-					}
-				}
-			}
-			if (exists1 == null) { // OK this is a pretty new experiment so we
-									// are going to create a new node
-				log.info("This is a new Job, so creating all the experiment docs from the scratch");
-				zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
-						CreateMode.PERSISTENT);
-
-				Stat expParent = zk.exists(newExpNode, false);
-				if (tokenId != null && expParent != null) {
-					zk.setData(newExpNode, tokenId.getBytes(),
-							expParent.getVersion());
-				}
-				zk.create(newExpNode + File.separator + "state", String
-						.valueOf(GfacExperimentState.LAUNCHED.getValue())
-						.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
-						CreateMode.PERSISTENT);
-                zk.create(newExpNode + File.separator + "operation","submit".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                        CreateMode.PERSISTENT);
-
-			} else {
-				// ohhh this node exists in some other failed gfac folder, we
-				// have to move it to this gfac experiment list,safely
-				log.info("This is an old Job, so copying data from old experiment location");
-				zk.create(newExpNode,
-						zk.getData(foundExperimentPath, false, exists1),
-						ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
-				List<String> children = zk.getChildren(foundExperimentPath,
-						false);
-				for (String childNode1 : children) {
-					String level1 = foundExperimentPath + File.separator
-							+ childNode1;
-					Stat exists2 = zk.exists(level1, false); // no need to check
-																// exists
-					String newLeve1 = newExpNode + File.separator + childNode1;
-					log.info("Creating new znode: " + newLeve1); // these has to
-																	// be info
-																	// logs
-					zk.create(newLeve1, zk.getData(level1, false, exists2),
-							ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-					for (String childNode2 : zk.getChildren(level1, false)) {
-						String level2 = level1 + File.separator + childNode2;
-						Stat exists3 = zk.exists(level2, false); // no need to
-																	// check
-																	// exists
-						String newLeve2 = newLeve1 + File.separator
-								+ childNode2;
-						log.info("Creating new znode: " + newLeve2);
-						zk.create(newLeve2, zk.getData(level2, false, exists3),
-								ZooDefs.Ids.OPEN_ACL_UNSAFE,
-								CreateMode.PERSISTENT);
-					}
-				}
-				// After all the files are successfully transfered we delete the
-				// old experiment,otherwise we do
-				// not delete a single file
-				log.info("After a successful copying of experiment data for an old experiment we delete
the old data");
-				log.info("Deleting experiment data: " + foundExperimentPath);
-				ZKUtil.deleteRecursive(zk, foundExperimentPath);
-			}
-		}else if(experimentEntry != null && GFacUtils.isCancelled(experimentID,zk) ){
-            // this happens when a cancel request comes to a differnt gfac node, in this
case we do not move gfac experiment
-            // node to gfac node specific location, because original request execution will
fail with errors
-            log.error("This experiment is already cancelled and its already executing the
cancel operation so cannot submit again !");
-            return false;
-        } else {
-            log.error("ExperimentID: " + experimentID + " taskID: " + taskID
-                    + " is already running by this Gfac instance");
-            List<String> runningGfacNodeNames = AiravataZKUtils
-                    .getAllGfacNodeNames(zk); // here we take old gfac servers
-            // too
-            for (String gfacServerNode : runningGfacNodeNames) {
-                if (!gfacServerNode.equals(pickedChild)) {
-                    foundExperimentPath = experimentNode + File.separator
-                            + gfacServerNode + File.separator + experimentID;
-                    break;
-                }
-            }
-            ZKUtil.deleteRecursive(zk, foundExperimentPath);
-        }
-        return true;
-	}
-
-	// This method is dangerous because of moving the experiment data
 	public static boolean createExperimentEntryForPassive(String experimentID,
 														  String taskID, ZooKeeper zk, String experimentNode,
 														  String pickedChild, String tokenId, long deliveryTag) throws KeeperException,
 			InterruptedException, ApplicationSettingsException {
 		String experimentPath = experimentNode + File.separator + pickedChild;
-		String newExpNode = experimentPath + File.separator + experimentID;
-		Stat exists1 = zk.exists(newExpNode, false);
-		String experimentEntry = GFacUtils.findExperimentEntry(experimentID, zk);
-		if (experimentEntry == null) {  // this means this is a very new experiment
+		String newExperimentPath = experimentPath + File.separator + experimentID;
+		Stat exists1 = zk.exists(newExperimentPath, false);
+		String oldExperimentPath = GFacUtils.findExperimentEntry(experimentID, zk);
+		if (oldExperimentPath == null) {  // this means this is a very new experiment
 			// are going to create a new node
 			log.info("This is a new Job, so creating all the experiment docs from the scratch");
 
-			zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+			zk.create(newExperimentPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
 					CreateMode.PERSISTENT);
 
-            String s = zk.create(newExpNode + File.separator + "state", String
+            String s = zk.create(newExperimentPath + File.separator + "state", String
 							.valueOf(GfacExperimentState.LAUNCHED.getValue())
 							.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
 					CreateMode.PERSISTENT);
@@ -1180,65 +1073,49 @@ public class GFacUtils {
 			}else{
 				log.error("Error creating node: "+s+" successfully !");
 			}
-
-			String s1 = zk.create(newExpNode + File.separator + "operation", "submit".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
-					CreateMode.PERSISTENT);
-			zk.exists(s1, false);// we want to know when this node get deleted
-			zk.create(newExpNode + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag),
ZooDefs.Ids.OPEN_ACL_UNSAFE,  // here we store the value of delivery message
+			zk.create(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag),
ZooDefs.Ids.OPEN_ACL_UNSAFE,  // here we store the value of delivery message
 					CreateMode.PERSISTENT);
 		} else {
 			log.error("ExperimentID: " + experimentID + " taskID: " + taskID
 					+ " was running by some Gfac instance,but it failed");
-			if(newExpNode.equals(experimentEntry)){
+			if(newExperimentPath.equals(oldExperimentPath)){
 				log.info("Re-launch experiment came to the same GFac instance");
 			}else {
 				log.info("Re-launch experiment came to a new GFac instance so we are moving data to new
gfac node");
-				zk.create(newExpNode,
-						zk.getData(experimentEntry, false, exists1),
-						ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
-				List<String> children = zk.getChildren(experimentEntry,
-						false);
-				for (String childNode1 : children) {
-					String level1 = experimentEntry + File.separator
-							+ childNode1;
-					Stat exists2 = zk.exists(level1, false); // no need to check exists
-					String newLeve1 = newExpNode + File.separator + childNode1;
-					log.info("Creating new znode: " + newLeve1); // these has to be info logs
-					zk.create(newLeve1, zk.getData(level1, false, exists2),
-							ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-					for (String childNode2 : zk.getChildren(level1, false)) {
-						String level2 = level1 + File.separator + childNode2;
-						Stat exists3 = zk.exists(level2, false); // no need to check exists
-						String newLeve2 = newLeve1 + File.separator
-								+ childNode2;
-						log.info("Creating new znode: " + newLeve2);
-						zk.create(newLeve2, zk.getData(level2, false, exists3),
-								ZooDefs.Ids.OPEN_ACL_UNSAFE,
-								CreateMode.PERSISTENT);
-					}
-				}
-
-
-				String oldDeliveryTag = experimentEntry + AiravataZKUtils.DELIVERY_TAG_POSTFIX;
+				zk.create(newExperimentPath, zk.getData(oldExperimentPath, false, exists1),
+						ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // recursively copy children
+                copyChildren(zk, oldExperimentPath, newExperimentPath, 2); // we need to
copy children up to depth 2
+				String oldDeliveryTag = oldExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX;
 				Stat exists = zk.exists(oldDeliveryTag, false);
 				if(exists!=null) {
-					zk.create(newExpNode + AiravataZKUtils.DELIVERY_TAG_POSTFIX,
+					zk.create(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX,
 							zk.getData(oldDeliveryTag,null,exists),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 					ZKUtil.deleteRecursive(zk,oldDeliveryTag);
 				}
-				// After all the files are successfully transfered we delete the
-				// old experiment,otherwise we do
+				// After all the files are successfully transfered we delete the // old experiment,otherwise
we do
 				// not delete a single file
 				log.info("After a successful copying of experiment data for an old experiment we delete
the old data");
-				log.info("Deleting experiment data: " + experimentEntry);
-				ZKUtil.deleteRecursive(zk, experimentEntry);
+				log.info("Deleting experiment data: " + oldExperimentPath);
+				ZKUtil.deleteRecursive(zk, oldExperimentPath);
 			}
 		}
 		return true;
 	}
 
-	/**
+    private static void copyChildren(ZooKeeper zk, String oldPath, String newPath, int depth)
throws KeeperException, InterruptedException {
+        for (String childNode : zk.getChildren(oldPath, false)) {
+            String oldChildPath = oldPath + File.separator + childNode;
+            Stat stat = zk.exists(oldChildPath, false); // no need to check exists
+            String newChildPath = newPath + File.separator + childNode;
+            log.info("Creating new znode: " + newChildPath);
+            zk.create(newChildPath, zk.getData(oldChildPath, false, stat), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+            if (--depth > 0) {
+                copyChildren(zk , oldChildPath, newChildPath, depth );
+            }
+        }
+    }
+
+    /**
 	 * This will return a value if the server is down because we iterate through exisiting experiment
nodes, not
 	 * through gfac-server nodes
 	 * @param experimentID
@@ -1292,12 +1169,21 @@ public class GFacUtils {
 		return null;
 	}
 
-    public static void setExperimentCancel(String experimentId,String taskId,ZooKeeper zk)throws
KeeperException,
+    public static void setExperimentCancel(String experimentId, String taskId, ZooKeeper
zk, String experimentNode,
+                                           String pickedChild, String tokenId, long deliveryTag)throws
KeeperException,
             InterruptedException {
+        // TODO : remove this if all went well
+ /*       String experimentPath = experimentNode + File.separator + pickedChild;
+        String newExpNode = experimentPath + File.separator + experimentId;
         String experimentEntry = GFacUtils.findExperimentEntry(experimentId, zk);
-        if(experimentEntry == null){
-            log.error("Cannot find the experiment Entry, so cancel operation cannot be performed
!!!");
-        }else {
+        if (experimentEntry == null) {
+            // This should be handle in validation request. Gfac shouldn't get any invalidate
experiment.
+            log.error("Cannot find the experiment Entry, so cancel operation cannot be performed.
" +
+                    "This happen when experiment completed and already removed from the zookeeper");
+        } else {
+            if (newExpNode.equals(experimentEntry)) {
+                log.info("Cancel experiment come to ");
+            }
             Stat operation = zk.exists(experimentEntry + File.separator + "operation", false);
             if (operation == null) { // if there is no entry, this will come when a user
immediately cancel a job
                 zk.create(experimentEntry + File.separator + "operation", "cancel".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
@@ -1305,7 +1191,7 @@ public class GFacUtils {
             } else { // if user submit the job to gfac then cancel during execution
                 zk.setData(experimentEntry + File.separator + "operation", "cancel".getBytes(),
operation.getVersion());
             }
-        }
+        }*/
 
     }
     public static boolean isCancelled(String experimentID, ZooKeeper zk

http://git-wip-us.apache.org/repos/asf/airavata/blob/d2108729/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 6a294af..0c94daa 100644
--- a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -285,17 +285,17 @@ public class EmailBasedMonitor implements Runnable{
             log.info("[EJM]: Job failed email received , removed job from job monitoring.
" + jobDetails);
         }else if (resultState == JobState.CANCELED) {
             jobMonitorMap.remove(jobStatusResult.getJobId());
-            runOutHandlers = true;
+            runOutHandlers = false; // Do we need to run out handlers in canceled case?
             log.info("[EJM]: Job canceled mail received, removed job from job monitoring.
" + jobDetails);
 
         }
+        log.info("[EJM]: Publishing status changes to amqp. " + jobDetails);
+        publishJobStatusChange(jEC);
 
         if (runOutHandlers) {
             log.info("[EJM]: Calling Out Handler chain of " + jobDetails);
             GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(jEC,
BetterGfacImpl.getMonitorPublisher()));
         }
-        log.info("[EJM]: Publishing status changes to amqp. " + jobDetails);
-        publishJobStatusChange(jEC);
     }
 
     private void publishJobStatusChange(JobExecutionContext jobExecutionContext) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/d2108729/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index 485029f..ca24502 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -243,22 +243,20 @@ public class SSHProvider extends AbstractProvider {
             }
             // This installed path is a mandetory field, because this could change based
on the computing resource
             if (jobDetails == null) {
-                log.error("There is not JobDetails so cancelations cannot perform !!!");
+                log.error("There is not JobDetails, Cancel request can't be performed !!!");
                 return;
             }
             try {
                 if (jobDetails.getJobID() != null) {
                     cluster.cancelJob(jobDetails.getJobID());
+                    GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
                 } else {
                     log.error("No Job Id is set, so cannot perform the cancel operation !!!");
-                    return;
+                    throw new GFacProviderException("Cancel request failed to cancel job
as JobId is null in Job Execution Context");
                 }
-                GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
             } catch (SSHApiException e) {
                 String error = "Error submitting the job to host " + jobExecutionContext.getHostName()
+ " message: " + e.getMessage();
                 log.error(error);
-                jobDetails.setJobID("none");
-                GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
                 StringWriter errors = new StringWriter();
                 e.printStackTrace(new PrintWriter(errors));
                 GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT,
ErrorCategory.AIRAVATA_INTERNAL_ERROR);
@@ -266,8 +264,6 @@ public class SSHProvider extends AbstractProvider {
             } catch (Exception e) {
                 String error = "Error submitting the job to host " + jobExecutionContext.getHostName()
+ " message: " + e.getMessage();
                 log.error(error);
-                jobDetails.setJobID("none");
-                GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
                 StringWriter errors = new StringWriter();
                 e.printStackTrace(new PrintWriter(errors));
                 GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT,
ErrorCategory.AIRAVATA_INTERNAL_ERROR);

http://git-wip-us.apache.org/repos/asf/airavata/blob/d2108729/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index ff6eab1..3da1e47 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -566,7 +566,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
                         taskDetails.setTaskStatus(taskStatus);
                         registry.update(RegistryModelType.TASK_DETAIL, o,
                                 taskDetails);
-                        GFacUtils.setExperimentCancel(experimentId, taskDetails.getTaskID(),
zk);
+//                        GFacUtils.setExperimentCancel(experimentId, taskDetails.getTaskID(),
zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
                     }
                 }
             }else {
@@ -617,7 +617,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
                             taskDetails.setTaskStatus(taskStatus);
                             registry.update(RegistryModelType.TASK_DETAIL, o,
                                     taskDetails.getTaskID());
-                            GFacUtils.setExperimentCancel(experimentId, taskDetails.getTaskID(),
zk);
+//                            GFacUtils.setExperimentCancel(experimentId, taskDetails.getTaskID(),
zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
                         }
                         // iterate through all the generated tasks and performs the
                         // job submisssion+monitoring

http://git-wip-us.apache.org/repos/asf/airavata/blob/d2108729/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
deleted file mode 100644
index 64ced47..0000000
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.orchestrator.core.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.credential.store.store.CredentialReader;
-import org.apache.airavata.gfac.client.GFACInstance;
-import org.apache.airavata.gfac.client.GFacClientFactory;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.cpi.GfacService;
-import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
-import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
-import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.apache.thrift.TException;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/*
- * this class is responsible for submitting a job to gfac in service mode,
- * it will select a gfac instance based on the incoming request and submit to that
- * gfac instance.
- */
-public class GFACRPCJobSubmitter implements JobSubmitter, Watcher {
-	private final static Logger logger = LoggerFactory.getLogger(GFACRPCJobSubmitter.class);
-	public static final String IP = "ip";
-
-	private OrchestratorContext orchestratorContext;
-
-	private static Integer mutex = -1;
-
-	public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException
{
-		this.orchestratorContext = orchestratorContext;
-	}
-
-	public GFACInstance selectGFACInstance() throws OrchestratorException {
-		// currently we only support one instance but future we have to pick an
-		// instance
-		return null;
-	}
-
-	public boolean submit(String experimentID, String taskID) throws OrchestratorException {
-		return this.submit(experimentID, taskID, null);
-	}
-
-	public boolean submit(String experimentID, String taskID, String tokenId) throws OrchestratorException
{
-		ZooKeeper zk = orchestratorContext.getZk();
-        GfacService.Client gfacClient = null;
-		try {
-			if (zk == null || !zk.getState().isConnected()) {
-				String zkhostPort = AiravataZKUtils.getZKhostPort();
-				zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
-				synchronized (mutex) {
-					mutex.wait();
-				}
-			}
-			String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
-			String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,
"/gfac-experiments");
-			List<String> children = zk.getChildren(gfacServer, this);
-			
-			if (children.size() == 0) {
-                // Zookeeper data need cleaning
-                throw new OrchestratorException("There is no active GFac instance to route
the request");
-            } else {
-				String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size());
-				// here we are not using an index because the getChildren does not return the same order
everytime
-				String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild,
false, null));
-				logger.info("GFAC instance node data: " + gfacNodeData);
-				String[] split = gfacNodeData.split(":");
-				gfacClient = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
-				if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) {
-					// before submitting the job we check again the state of the node
-					if (GFacUtils.createExperimentEntryForRPC(experimentID, taskID, zk, experimentNode,
pickedChild, tokenId)) {
-						 String gatewayId = null;
-                    	 CredentialReader credentialReader = GFacUtils.getCredentialReader();
-                         if (credentialReader != null) {
-                             try {
-                            	 gatewayId = credentialReader.getGatewayID(tokenId);
-                             } catch (Exception e) {
-                                 logger.error(e.getLocalizedMessage());
-                             }
-                         }
-                        if(gatewayId == null || gatewayId.isEmpty()){
-                         gatewayId = ServerSettings.getDefaultUserGateway();
-                        }
-						return gfacClient.submitJob(experimentID, taskID, gatewayId);
-					}
-				}
-			}
-		} catch (TException e) {
-            logger.error(e.getMessage(), e);
-			throw new OrchestratorException(e);
-		} catch (InterruptedException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-		} catch (KeeperException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-		} catch (ApplicationSettingsException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-		} catch (IOException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-		} catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-		}finally {
-            gfacClient.getOutputProtocol().getTransport().close();
-        }
-        return false;
-	}
-
-    public boolean terminate(String experimentID, String taskID, String tokenId) throws OrchestratorException
{
-        ZooKeeper zk = orchestratorContext.getZk();
-        GfacService.Client localhost = null;
-        try {
-            if (zk == null || !zk.getState().isConnected()) {
-                String zkhostPort = AiravataZKUtils.getZKhostPort();
-                zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
-                synchronized (mutex) {
-                    mutex.wait();
-                }
-            }
-            String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE,
"/gfac-server");
-            String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,
"/gfac-experiments");
-            List<String> children = zk.getChildren(gfacServer, this);
-
-            if (children.size() == 0) {
-                // Zookeeper data need cleaning
-                throw new OrchestratorException("There is no active GFac instance to route
the request");
-            } else {
-                String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE)
% children.size());
-                // here we are not using an index because the getChildren does not return
the same order everytime
-                String gfacNodeData = new String(zk.getData(gfacServer + File.separator +
pickedChild, false, null));
-                logger.info("GFAC instance node data: " + gfacNodeData);
-                String[] split = gfacNodeData.split(":");
-                localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
-                if (zk.exists(gfacServer + File.separator + pickedChild, false) != null)
{
-                    // before submitting the job we check again the state of the node
-                    if (GFacUtils.createExperimentEntryForRPC(experimentID, taskID, zk, experimentNode,
pickedChild, null)) {
-                        return localhost.cancelJob(experimentID, taskID);
-                    }
-                }
-            }
-        } catch (TException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        } catch (InterruptedException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        } catch (KeeperException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        } catch (ApplicationSettingsException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        } catch (IOException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        }finally {
-
-        }
-        return false;
-    }
-
-    synchronized public void process(WatchedEvent event) {
-		synchronized (mutex) {
-			switch (event.getState()) {
-			case SyncConnected:
-				mutex.notify();
-			}
-			switch (event.getType()) {
-			case NodeCreated:
-				mutex.notify();
-				break;
-			}
-		}
-	}
-}


Mime
View raw message