airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [1/5] airavata git commit: refactored , submitJob and relaunch methods logic with states.
Date Sun, 10 May 2015 20:41:09 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 34a840108 -> e4be39e81


refactored , submitJob and relaunch methods logic with states.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4819dbb2
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4819dbb2
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4819dbb2

Branch: refs/heads/master
Commit: 4819dbb2c4f05f8ec4790bcbda641556e19944d1
Parents: 7023991
Author: shamrath <shameerainfo@gmail.com>
Authored: Sun May 10 11:10:41 2015 -0400
Committer: shamrath <shameerainfo@gmail.com>
Committed: Sun May 10 11:10:41 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 109 +++++++++++--------
 1 file changed, 65 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/4819dbb2/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index fcb1394..98ba942 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -494,7 +494,7 @@ public class BetterGfacImpl implements GFac,Watcher {
             String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(),
zk);
             Stat exists = zk.exists(experimentEntry + File.separator + "operation", false);
             zk.getData(experimentEntry + File.separator + "operation", this, exists);
-            int stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
  // this is the original state came, if we query again it might be different,so we preserve
this state in the environment
+            GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(zk, jobExecutionContext);
  // this is the original state came, if we query again it might be different,so we preserve
this state in the environment
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                     , GfacExperimentState.ACCEPTED));                  // immediately we
get the request we update the status
             String workflowInstanceID = null;
@@ -505,17 +505,17 @@ public class BetterGfacImpl implements GFac,Watcher {
             }
             // Register log event listener. This is required in all scenarios.
             jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
-            if (stateVal < 2) {
+            if (isNewJob(gfacExpState)) {
                 // In this scenario We do everything from the beginning
                 launch(jobExecutionContext);
-            } else if (stateVal >= 8) {
+            } else if (isCompletedJob(gfacExpState)) {
                 log.info("There is nothing to recover in this job so we do not re-submit");
                 ZKUtil.deleteRecursive(zk,
                         AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()));
             } else {
                 // Now we know this is an old Job, so we have to handle things gracefully
                 log.info("Re-launching the job in GFac because this is re-submitted to GFac");
-                reLaunch(jobExecutionContext, stateVal);
+                reLaunch(jobExecutionContext, gfacExpState);
             }
             return true;
         } catch (ApplicationSettingsException e) {
@@ -530,6 +530,27 @@ public class BetterGfacImpl implements GFac,Watcher {
         }
     }
 
+    private boolean isCompletedJob(GfacExperimentState gfacExpState) {
+        switch (gfacExpState) {
+            case COMPLETED:
+            case FAILED:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    private boolean isNewJob(GfacExperimentState stateVal) {
+        switch (stateVal) {
+            case UNKNOWN:
+            case LAUNCHED:
+            case ACCEPTED:
+                return true;
+            default:
+                return false;
+        }
+    }
+
     public boolean cancel(String experimentID, String taskID, String gatewayID) throws GFacException
{
         JobExecutionContext jobExecutionContext = null;
         try {
@@ -635,7 +656,7 @@ public class BetterGfacImpl implements GFac,Watcher {
         }
     }
 
-	private void reLaunch(JobExecutionContext jobExecutionContext, int stateVal) throws GFacException
{
+	private void reLaunch(JobExecutionContext jobExecutionContext, GfacExperimentState state)
throws GFacException {
 		// Scheduler will decide the execution flow of handlers and provider
 		// which handles
 		// the job.
@@ -648,47 +669,46 @@ public class BetterGfacImpl implements GFac,Watcher {
 			// here we do not skip handler if some handler does not have to be
 			// run again during re-run it can implement
 			// that logic in to the handler
-			reInvokeInFlowHandlers(jobExecutionContext);
 
-			// After executing the in handlers provider instance should be set
-			// to job execution context.
-			// We get the provider instance and execute it.
-			if (stateVal == 2 || stateVal == 3) {
-				invokeProviderExecute(jobExecutionContext); // provider never ran in
-														// previous invocation
-			} else if (stateVal == 4) { // whether sync or async job have to
-										// invoke the recovering because it
-										// crashed in the Handler
-				reInvokeProviderExecute(jobExecutionContext);
-			} else if (stateVal >= 5 && GFacUtils.isSynchronousMode(jobExecutionContext))
{
-				// In this case we do nothing because provider ran successfully,
-				// no need to re-run the job
-				log.info("Provider does not have to be recovered because it ran successfully for experiment:
" + experimentID);
-			} else if (stateVal == 5 && !GFacUtils.isSynchronousMode(jobExecutionContext))
{
-				// this is async mode where monitoring of jobs is hapenning, we  have to recover
-				reInvokeProviderExecute(jobExecutionContext);
-			} else if (stateVal == 6) {
-				reInvokeOutFlowHandlers(jobExecutionContext);
-			} else {
-				log.info("We skip invoking Handler, because the experiment:" + stateVal + " state is
beyond the Provider Invocation !!!");
-				log.info("ExperimentId: " + experimentID + " taskId: " + jobExecutionContext.getTaskData().getTaskID());
-			}
+            // After executing the in handlers provider instance should be set
+            // to job execution context.
+            // We get the provider instance and execute it.
+            switch (state) {
+                case INHANDLERSINVOKING:
+                    reInvokeInFlowHandlers(jobExecutionContext);
+                case INHANDLERSINVOKED:
+                    invokeProviderExecute(jobExecutionContext);
+                    break;
+                case PROVIDERINVOKING:
+                    reInvokeProviderExecute(jobExecutionContext);
+                    break;
+                case PROVIDERINVOKED:
+                    // no need to re-run the job
+                    log.info("Provider does not have to be recovered because it ran successfully
for experiment: " + experimentID);
+                    if (!GFacUtils.isSynchronousMode(jobExecutionContext)) {
+                        monitorJob(jobExecutionContext);
+                    } else {
+                        // TODO - Need to handle this correctly , for now we will invoke
ouput handlers.
+                        invokeOutFlowHandlers(jobExecutionContext);
+                    }
+                    break;
+                case OUTHANDLERSINVOKING:
+                    reInvokeOutFlowHandlers(jobExecutionContext);
+                    break;
+                case OUTHANDLERSINVOKED:
+                case COMPLETED:
+                case FAILED:
+                case UNKNOWN:
+                    log.info("All output handlers are invoked successfully, ExperimentId:
" + experimentID + " taskId: " + jobExecutionContext.getTaskData().getTaskID());
+                    break;
+                default:
+                    throw new GFacException("Un-handled GfacExperimentState : " + state.name());
+            }
 		} catch (Exception e) {
             log.error(e.getMessage(),e);
 			try {
 				// we make the experiment as failed due to exception scenario
 				monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),
GfacExperimentState.FAILED));
-				// monitorPublisher.publish(new
-				// ExperimentStatusChangedEvent(new
-				// ExperimentIdentity(jobExecutionContext.getExperimentID()),
-				// ExperimentState.FAILED));
-				// Updating the task status if there's any task associated
-				// monitorPublisher.publish(new TaskStatusChangedEvent(
-				// new TaskIdentity(jobExecutionContext.getExperimentID(),
-				// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-				// jobExecutionContext.getTaskData().getTaskID()),
-				// TaskState.FAILED
-				// ));
                 JobIdentifier jobIdentity = new JobIdentifier(
                         jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(),
                         jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
@@ -699,9 +719,6 @@ public class BetterGfacImpl implements GFac,Watcher {
 			} catch (NullPointerException e1) {
 				log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses
to failed, "
 						+ "NullPointerException occurred because at this point there might not have Job Created",
e1, e);
-//				monitorPublisher
-//						.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
ExperimentState.FAILED));
-				// Updating the task status if there's any task associated
                 TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
                         jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                         jobExecutionContext.getExperimentID(),
@@ -716,7 +733,11 @@ public class BetterGfacImpl implements GFac,Watcher {
         }
     }
 
-	private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
+    private void monitorJob(JobExecutionContext jobExecutionContext) {
+        // TODO - Auto generated message.
+    }
+
+    private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
 		// Scheduler will decide the execution flow of handlers and provider
 		// which handles
 		// the job.


Mime
View raw message