airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject [2/3] git commit: more implementation for job cancel
Date Thu, 14 Aug 2014 20:44:55 GMT
more implementation for job cancel


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a4296772
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a4296772
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a4296772

Branch: refs/heads/master
Commit: a429677293c7b3677658ef9d82beca9b52d8b84d
Parents: 1c55448
Author: lahiru <lahiru@apache.org>
Authored: Fri Aug 15 02:05:39 2014 +0530
Committer: lahiru <lahiru@apache.org>
Committed: Fri Aug 15 02:05:39 2014 +0530

----------------------------------------------------------------------
 .../client/samples/CreateLaunchExperiment.java  |   6 -
 .../airavata/gfac/server/GfacServerHandler.java |   8 +-
 .../gfac/bes/provider/impl/BESProvider.java     |   5 +-
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 142 +++++++++++++++++--
 .../org/apache/airavata/gfac/core/cpi/GFac.java |   8 ++
 .../apache/airavata/gfac/core/cpi/GFacImpl.java |  51 +++++++
 .../core/monitor/GfacInternalStatusUpdator.java |  11 +-
 .../gfac/core/provider/GFacProvider.java        |   3 +-
 .../org/apache/airavata/job/TestProvider.java   |   2 +-
 .../apache/airavata/gfac/ec2/EC2Provider.java   |   2 +-
 .../gfac/gram/provider/impl/GramProvider.java   |   4 +-
 .../gsissh/provider/impl/GSISSHProvider.java    |   5 +-
 .../hadoop/provider/impl/HadoopProvider.java    |   2 +-
 .../gfac/local/provider/impl/LocalProvider.java |   2 +-
 .../gfac/ssh/provider/impl/SSHProvider.java     |   3 +-
 .../core/impl/GFACServiceJobSubmitter.java      |  43 +++++-
 .../cpi/impl/SimpleOrchestratorImpl.java        |  19 ++-
 17 files changed, 268 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 0729478..967577c 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -795,12 +795,6 @@ public class CreateLaunchExperiment {
             String sshTokenId = "61abd2ff-f92b-4901-a077-07b51abe2c5d";
             String gsisshTokenId = "61abd2ff-f92b-4901-a077-07b51abe2c5d";
             client.launchExperiment(expId, sshTokenId);
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-            client.terminateExperiment(expId);
         } catch (ExperimentNotFoundException e) {
             logger.error("Error occured while launching the experiment...", e.getMessage());
             throw new ExperimentNotFoundException(e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index b098120..01aa665 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -203,7 +203,13 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
     }
 
     public boolean cancelJob(String experimentId, String taskId) throws TException {
-        throw new TException("Operation not supported");
+        logger.info("GFac Recieved the Experiment: " + experimentId + " TaskId: " + taskId);
+        GFac gfac = getGfac();
+        try {
+            return gfac.submitJob(experimentId, taskId, ServerSettings.getSetting(Constants.GATEWAY_NAME));
+        } catch (Exception e) {
+            throw new TException("Error launching the experiment : " + e.getMessage(), e);
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
index b325f62..22e823d 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@ -338,14 +338,13 @@ public class BESProvider extends AbstractProvider {
     /**
      * EndpointReference need to be saved to make cancel work.
      *
-     * @param activityEpr
      * @param jobExecutionContext
      * @throws GFacProviderException
      */
-    public void cancelJob(String activityEpr, JobExecutionContext jobExecutionContext) throws
GFacProviderException {
+    public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException
{
         try {
             initSecurityProperties(jobExecutionContext);
-            EndpointReferenceType eprt = EndpointReferenceType.Factory.parse(activityEpr);
+            EndpointReferenceType eprt = EndpointReferenceType.Factory.parse(jobExecutionContext.getJobDetails().getJobID());
             UnicoreHostType host = (UnicoreHostType) jobExecutionContext.getApplicationContext().getHostDescription()
                     .getType();
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index a68a302..7a12043 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -53,7 +53,6 @@ import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.handler.GFacRecoverableHandler;
 import org.apache.airavata.gfac.core.handler.ThreadedHandler;
-import org.apache.airavata.gfac.core.monitor.ExperimentIdentity;
 import org.apache.airavata.gfac.core.monitor.JobIdentity;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
 import org.apache.airavata.gfac.core.monitor.TaskIdentity;
@@ -85,7 +84,6 @@ import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePrefer
 import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
 import org.apache.airavata.model.workspace.experiment.DataObjectType;
 import org.apache.airavata.model.workspace.experiment.Experiment;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
 import org.apache.airavata.model.workspace.experiment.TaskState;
@@ -528,6 +526,81 @@ public class BetterGfacImpl implements GFac {
         return true;
     }
 
+    public boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException {
+        // We need to check whether this job is submitted as a part of a large workflow.
If yes,
+        // we need to setup workflow tracking listerner.
+        try {
+            int stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
  // this is the original state came, if we query again it might be different,so we preserve
this state in the environment
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                    , GfacExperimentState.ACCEPTED));                  // immediately we
get the request we update the status
+            String workflowInstanceID = null;
+            if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID))
!= null) {
+                // This mean we need to register workflow tracking listener.
+                //todo implement WorkflowTrackingListener properly
+                registerWorkflowTrackingListener(workflowInstanceID, jobExecutionContext);
+            }
+            // Register log event listener. This is required in all scenarios.
+            jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
+            if (stateVal < 2) {
+                // In this scenario We do everything from the beginning
+                log.info("Job is not yet submitted, so nothing much to do except changing
the registry entry " +
+                        " and stop the execution chain");
+                //todo update registry and find a way to stop the execution chain
+            } else if (stateVal >= 8) {
+                log.info("There is nothing to recover in this job so we do not re-submit");
+                ZKUtil.deleteRecursive(zk,
+                        AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID(),
jobExecutionContext.getTaskData().getTaskID()));
+            } else {
+                // Now we know this is an old Job, so we have to handle things gracefully
+                log.info("Job is in a position to perform a proper cancellation");
+                try {
+                    Scheduler.schedule(jobExecutionContext);
+
+                    invokeProviderCancel(jobExecutionContext);
+
+                } catch (Exception e) {
+                    try {
+                        // we make the experiment as failed due to exception scenario
+                        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new
MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+                        // monitorPublisher.publish(new
+                        // ExperimentStatusChangedEvent(new
+                        // ExperimentIdentity(jobExecutionContext.getExperimentID()),
+                        // ExperimentState.FAILED));
+                        // Updating the task status if there's any task associated
+                        // monitorPublisher.publish(new TaskStatusChangeRequest(
+                        // new TaskIdentity(jobExecutionContext.getExperimentID(),
+                        // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                        // jobExecutionContext.getTaskData().getTaskID()),
+                        // TaskState.FAILED
+                        // ));
+                        monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext),
new JobIdentity(jobExecutionContext.getExperimentID(),
+                                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext
+                                .getJobDetails().getJobID()), JobState.FAILED));
+                    } catch (NullPointerException e1) {
+                        log.error("Error occured during updating the statuses of Experiments,tasks
or Job statuses to failed, "
+                                + "NullPointerException occurred because at this point there
might not have Job Created", e1, e);
+                        //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
ExperimentState.FAILED));
+                        // Updating the task status if there's any task associated
+                        monitorPublisher.publish(new TaskStatusChangeRequest(new TaskIdentity(jobExecutionContext.getExperimentID(),
jobExecutionContext
+                                .getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID()),
TaskState.FAILED));
+
+                    }
+                    jobExecutionContext.setProperty(ERROR_SENT, "true");
+                    jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
+                    throw new GFacException(e.getMessage(), e);
+                }
+            }
+            return true;
+        } catch (ApplicationSettingsException e) {
+            e.printStackTrace();
+        } catch (KeeperException e) {
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        return true;
+    }
+
 	private void reLaunch(JobExecutionContext jobExecutionContext, int stateVal) throws GFacException
{
 		// Scheduler will decide the execution flow of handlers and provider
 		// which handles
@@ -547,12 +620,12 @@ public class BetterGfacImpl implements GFac {
 			// to job execution context.
 			// We get the provider instance and execute it.
 			if (stateVal == 2 || stateVal == 3) {
-				invokeProvider(jobExecutionContext); // provider never ran in
+				invokeProviderExecute(jobExecutionContext); // provider never ran in
 														// previous invocation
 			} else if (stateVal == 4) { // whether sync or async job have to
 										// invoke the recovering because it
 										// crashed in the Handler
-				reInvokeProvider(jobExecutionContext);
+				reInvokeProviderExecute(jobExecutionContext);
 			} else if (stateVal >= 5 && GFacUtils.isSynchronousMode(jobExecutionContext))
{
 				// In this case we do nothing because provider ran successfully,
 				// no need to re-run the job
@@ -560,7 +633,7 @@ public class BetterGfacImpl implements GFac {
 			} else if (stateVal == 5 && !GFacUtils.isSynchronousMode(jobExecutionContext))
{
 				// this is async mode where monitoring of jobs is hapenning, we
 				// have to recover
-				reInvokeProvider(jobExecutionContext);
+				reInvokeProviderExecute(jobExecutionContext);
 			} else if (stateVal == 6) {
 				reInvokeOutFlowHandlers(jobExecutionContext);
 			} else {
@@ -624,7 +697,7 @@ public class BetterGfacImpl implements GFac {
 			// After executing the in handlers provider instance should be set
 			// to job execution context.
 			// We get the provider instance and execute it.
-			invokeProvider(jobExecutionContext);
+			invokeProviderExecute(jobExecutionContext);
 		} catch (Exception e) {
 			try {
 				// we make the experiment as failed due to exception scenario
@@ -658,7 +731,7 @@ public class BetterGfacImpl implements GFac {
 		}
 	}
 
-    private void invokeProvider(JobExecutionContext jobExecutionContext) throws GFacException,
ApplicationSettingsException, InterruptedException, KeeperException {
+    private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws GFacException,
ApplicationSettingsException, InterruptedException, KeeperException {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),
GfacExperimentState.PROVIDERINVOKING));
@@ -674,7 +747,7 @@ public class BetterGfacImpl implements GFac {
         }
     }
 
-    private void reInvokeProvider(JobExecutionContext jobExecutionContext) throws GFacException,
GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException
{
+    private void reInvokeProviderExecute(JobExecutionContext jobExecutionContext) throws
GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException,
KeeperException {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),
GfacExperimentState.PROVIDERINVOKING));
@@ -703,6 +776,51 @@ public class BetterGfacImpl implements GFac {
 
     }
 
+    private void invokeProviderCancel(JobExecutionContext jobExecutionContext) throws GFacException,
ApplicationSettingsException, InterruptedException, KeeperException {
+        GFacProvider provider = jobExecutionContext.getProvider();
+        if (provider != null) {
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),
GfacExperimentState.PROVIDERINVOKING));
+            GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName());
+            initProvider(provider, jobExecutionContext);
+            cancelProvider(provider, jobExecutionContext);
+            disposeProvider(provider, jobExecutionContext);
+            GFacUtils.updatePluginState(zk, jobExecutionContext, provider.getClass().getName(),
GfacPluginState.COMPLETED);
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),
GfacExperimentState.PROVIDERINVOKED));
+        }
+        if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+            invokeOutFlowHandlers(jobExecutionContext);
+        }
+    }
+
+    private void reInvokeProviderCancel(JobExecutionContext jobExecutionContext) throws GFacException,
GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException
{
+        GFacProvider provider = jobExecutionContext.getProvider();
+        if (provider != null) {
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),
GfacExperimentState.PROVIDERINVOKING));
+            String plState = GFacUtils.getPluginState(zk, jobExecutionContext, provider.getClass().getName());
+            if (Integer.valueOf(plState) >= GfacPluginState.INVOKED.getValue()) {    //
this will make sure if a plugin crashes it will not launch from the scratch, but plugins have
to save their invoked state
+                if (provider instanceof GFacRecoverableProvider) {
+                    GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName());
+                    ((GFacRecoverableProvider) provider).recover(jobExecutionContext);
+                    GFacUtils.updatePluginState(zk, jobExecutionContext, provider.getClass().getName(),
GfacPluginState.COMPLETED);
+                }
+            } else {
+                GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName());
+                initProvider(provider, jobExecutionContext);
+                cancelProvider(provider, jobExecutionContext);
+                disposeProvider(provider, jobExecutionContext);
+                GFacUtils.updatePluginState(zk, jobExecutionContext, provider.getClass().getName(),
GfacPluginState.COMPLETED);
+            }
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),
GfacExperimentState.PROVIDERINVOKED));
+        }
+
+        if (GFacUtils.isSynchronousMode(jobExecutionContext))
+
+        {
+            invokeOutFlowHandlers(jobExecutionContext);
+        }
+
+    }
+
 
     private void initProvider(GFacProvider provider, JobExecutionContext jobExecutionContext)
throws GFacException {
         try {
@@ -720,6 +838,14 @@ public class BetterGfacImpl implements GFac {
         }
     }
 
+    private void cancelProvider(GFacProvider provider, JobExecutionContext jobExecutionContext)
throws GFacException {
+        try {
+            provider.cancelJob(jobExecutionContext);
+        } catch (Exception e) {
+            throw new GFacException("Error while executing provider " + provider.getClass().getName()
+ " functionality.", e);
+        }
+    }
+
     private void disposeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext)
throws GFacException {
         try {
             provider.dispose(jobExecutionContext);

http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
index f161a55..79f5a0b 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
@@ -53,4 +53,12 @@ public interface GFac {
      */
     public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException;
 
+    /**
+     * This operation can be used to cancel an already running experiment
+     * @param jobExecutionContext
+     * @return Successful cancellation will return true
+     * @throws GFacException
+     */
+    public boolean cancel(JobExecutionContext jobExecutionContext)throws GFacException;
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
index d370924..bf27deb 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
@@ -305,6 +305,50 @@ public class GFacImpl implements GFac {
         return true;
     }
 
+
+    public boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException {
+        // We need to check whether this job is submitted as a part of a large workflow.
If yes,
+        // we need to setup workflow tracking listerner.
+        String workflowInstanceID = null;
+        if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID))
!= null) {
+            // This mean we need to register workflow tracking listener.
+            //todo implement WorkflowTrackingListener properly
+            registerWorkflowTrackingListener(workflowInstanceID, jobExecutionContext);
+        }
+        // Register log event listener. This is required in all scenarios.
+        jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
+        try {
+            Scheduler.schedule(jobExecutionContext);
+            GFacProvider provider = jobExecutionContext.getProvider();
+            if (provider != null) {
+                initProvider(provider, jobExecutionContext);
+                cancelProvider(provider, jobExecutionContext);
+                disposeProvider(provider, jobExecutionContext);
+            }
+        }catch (Exception e) {
+            try {
+                monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext),
+                        new JobIdentity(jobExecutionContext.getExperimentID(),
+                                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                                jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getJobDetails().getJobID()),
JobState.FAILED));
+            } catch (NullPointerException e1) {
+                log.error("Error occured during updating the statuses of Experiments,tasks
or Job statuses to failed, " +
+                        "NullPointerException occurred because at this point there might
not have Job Created", e1, e);
+                // Updating status if job id is not set
+//				monitorPublisher
+//						.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
ExperimentState.FAILED));
+                // Updating the task status if there's any task associated
+                monitorPublisher.publish(new TaskStatusChangedEvent(new TaskIdentity(jobExecutionContext.getExperimentID(),
jobExecutionContext
+                        .getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID()),
TaskState.FAILED));
+
+            }
+            jobExecutionContext.setProperty(ERROR_SENT, "true");
+            jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
+            throw new GFacException(e.getMessage(), e);
+        }
+        return true;
+    }
+
     private void schedule(JobExecutionContext jobExecutionContext) throws GFacException {
         // Scheduler will decide the execution flow of handlers and provider which handles
         // the job.
@@ -367,6 +411,13 @@ public class GFacImpl implements GFac {
             throw new GFacException("Error while executing provider " + provider.getClass().getName()
+ " functionality.", e);
         }
     }
+    private void cancelProvider(GFacProvider provider, JobExecutionContext jobExecutionContext)
throws GFacException {
+        try {
+            provider.cancelJob(jobExecutionContext);
+        } catch (Exception e) {
+            throw new GFacException("Error while executing provider " + provider.getClass().getName()
+ " functionality.", e);
+        }
+    }
 
     private void disposeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext)
throws GFacException {
         try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 896bff8..a1856e6 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -29,12 +29,7 @@ import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
 import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.*;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,10 +82,10 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener,
Watc
         }
         switch (statusChangeRequest.getState()) {
             case COMPLETED:
-//                ZKUtil.deleteRecursive(zk,experimentPath);
+                ZKUtil.deleteRecursive(zk, experimentPath);
                 break;
             case FAILED:
-//                ZKUtil.deleteRecursive(zk,experimentPath);
+                ZKUtil.deleteRecursive(zk,experimentPath);
                 break;
             default:
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
index 7c17cf2..031cf77 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
@@ -53,11 +53,10 @@ public interface GFacProvider{
 
     /**
      * Cancels all jobs relevant to an experiment.
-     * @param jobId The experiment id
      * @param jobExecutionContext The job execution context, contains runtime information.
      * @throws GFacException If an error occurred while cancelling the job.
      */
-    public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacProviderException,
GFacException;
+    public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException,
GFacException;
 
 
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
index dcd7e3c..151ee19 100644
--- a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
+++ b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
@@ -29,7 +29,7 @@ import java.util.Map;
 
 public class TestProvider extends AbstractProvider {
 
-    public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacProviderException,
GFacException {
+    public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException,
GFacException {
         //To change body of implemented methods use File | Settings | File Templates.
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
b/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
index cf8a7d2..a8a9eb1 100644
--- a/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
+++ b/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
@@ -243,7 +243,7 @@ public class EC2Provider extends AbstractProvider {
         // Do nothing
     }
 
-    public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException
{
+    public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacException {
         throw new NotImplementedException();
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java
b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java
index 71d5a6b..142e492 100644
--- a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java
+++ b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java
@@ -364,8 +364,8 @@ public class GramProvider extends AbstractProvider {
     public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException
{
     }
 
-    public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException
{
-        cancelSingleJob(jobId, jobExecutionContext);
+    public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacException {
+        cancelSingleJob(jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext);
     }
 
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index 9ea284b..90efaad 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -26,13 +26,10 @@ import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.gfac.core.cpi.GFacImpl;
-import org.apache.airavata.gfac.core.handler.AbstractRecoverableHandler;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.handler.ThreadedHandler;
 import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.core.provider.AbstractProvider;
 import org.apache.airavata.gfac.core.provider.AbstractRecoverableProvider;
-import org.apache.airavata.gfac.core.provider.GFacProvider;
 import org.apache.airavata.gfac.core.provider.GFacProviderException;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
@@ -168,7 +165,7 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException
{
+    public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacException {
         //To change body of implemented methods use File | Settings | File Templates.
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/provider/impl/HadoopProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/provider/impl/HadoopProvider.java
b/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/provider/impl/HadoopProvider.java
index 6543c01..30a1bf9 100644
--- a/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/provider/impl/HadoopProvider.java
+++ b/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/provider/impl/HadoopProvider.java
@@ -143,7 +143,7 @@ public class HadoopProvider extends AbstractProvider {
     }
 
     @Override
-    public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException
{
+    public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacException {
         throw new NotImplementedException();
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index 9672767..425f782 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -241,7 +241,7 @@ public class LocalProvider extends AbstractProvider {
         }
     }
 
-    public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException
{
+    public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacException {
         throw new NotImplementedException();
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index ab268df..67e6628 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -37,7 +37,6 @@ import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.context.MessageContext;
 import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
-import org.apache.airavata.gfac.core.cpi.GFacImpl;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.handler.ThreadedHandler;
 import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
@@ -205,7 +204,7 @@ public class SSHProvider extends AbstractProvider {
     }
 
 
-    public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException
{
+    public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacException {
         throw new NotImplementedException();
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
index 0ca95ec..89925ee 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
@@ -124,7 +124,48 @@ public class GFACServiceJobSubmitter implements JobSubmitter, Watcher
{
 	}
 
     public boolean terminate(String experimentID, String taskID) throws OrchestratorException
{
-        throw new OrchestratorException(new OperationNotSupportedException("terminate method
is not yet implemented"));
+        ZooKeeper zk = orchestratorContext.getZk();
+        try {
+            if (zk == null || !zk.getState().isConnected()) {
+                String zkhostPort = AiravataZKUtils.getZKhostPort();
+                zk = new ZooKeeper(zkhostPort, 6000, this);
+                synchronized (mutex) {
+                    mutex.wait();
+                }
+            }
+            String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE,
"/gfac-server");
+            String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,
"/gfac-experiments");
+            List<String> children = zk.getChildren(gfacServer, this);
+
+            if (children.size() == 0) {
+                // Zookeeper data need cleaning
+                throw new OrchestratorException("There is no active GFac instance to route
the request");
+            } else {
+                String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE)
% children.size());
+                // here we are not using an index because the getChildren does not return
the same order everytime
+                String gfacNodeData = new String(zk.getData(gfacServer + File.separator +
pickedChild, false, null));
+                logger.info("GFAC instance node data: " + gfacNodeData);
+                String[] split = gfacNodeData.split(":");
+                GfacService.Client localhost = GFacClientFactory.createGFacClient(split[0],
Integer.parseInt(split[1]));
+                if (zk.exists(gfacServer + File.separator + pickedChild, false) != null)
{
+                    // before submitting the job we check again the state of the node
+                    return localhost.cancelJob(experimentID, taskID);
+                }
+            }
+        } catch (TException e) {
+            throw new OrchestratorException(e);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        } catch (KeeperException e) {
+            e.printStackTrace();
+        } catch (ApplicationSettingsException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return false;
     }
 
     synchronized public void process(WatchedEvent event) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index 83dccf0..70930f4 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -30,9 +30,7 @@ import org.apache.airavata.model.error.LaunchValidationException;
 import org.apache.airavata.model.error.ValidationResults;
 import org.apache.airavata.model.error.ValidatorResult;
 import org.apache.airavata.model.util.ExperimentModelUtil;
-import org.apache.airavata.model.workspace.experiment.Experiment;
-import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.core.job.JobSubmitter;
 import org.apache.airavata.orchestrator.core.validator.JobMetadataValidator;
@@ -163,10 +161,17 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         }
     }
 
-    public void cancelExperiment(Experiment experiment, WorkflowNodeDetails workflowNode,
TaskDetails task,String tokenId)
-			throws OrchestratorException {
-
-        throw new OrchestratorException(new OperationNotSupportedException());
+    public void cancelExperiment(Experiment experiment, WorkflowNodeDetails workflowNode,
TaskDetails task, String tokenId)
+            throws OrchestratorException {
+        List<JobDetails> jobDetailsList = task.getJobDetailsList();
+        for(JobDetails jobDetails:jobDetailsList) {
+            JobState jobState = jobDetails.getJobStatus().getJobState();
+            if (jobState.getValue() > 4){
+                logger.error("Cannot cancel the job, because current job state is : " + jobState.toString()
+
+                "jobId: " + jobDetails.getJobID() + " Job Name: " + jobDetails.getJobName());
+            }
+            jobSubmitter.terminate(experiment.getExperimentID(),task.getTaskID());
+        }
     }
 
 


Mime
View raw message