airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chath...@apache.org
Subject [1/5] airavata git commit: data streaming task
Date Wed, 11 Nov 2015 17:09:39 GMT
Repository: airavata
Updated Branches:
  refs/heads/develop c96f66b52 -> 5277ec48d


data streaming task


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/37e94a07
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/37e94a07
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/37e94a07

Branch: refs/heads/develop
Commit: 37e94a0780b9bbcfe80325d3cadd0d0099f68867
Parents: dd32d06
Author: Chathuri Wimalasena <chathuri@apache.org>
Authored: Wed Nov 11 11:37:37 2015 -0500
Committer: Chathuri Wimalasena <chathuri@apache.org>
Committed: Wed Nov 11 11:37:37 2015 -0500

----------------------------------------------------------------------
 .../airavata/gfac/impl/GFacEngineImpl.java      | 31 +++++++++++++++++++-
 .../gfac/impl/task/utils/StreamData.java        | 14 +++++++--
 2 files changed, 42 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/37e94a07/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index d386f66..ae6db61 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -35,6 +35,7 @@ import org.apache.airavata.gfac.core.monitor.JobMonitor;
 import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.task.DataStreamingTask;
 import org.apache.airavata.gfac.impl.task.EnvironmentSetupTask;
 import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
 import org.apache.airavata.model.appcatalog.computeresource.*;
@@ -275,6 +276,19 @@ public class GFacEngineImpl implements GFacEngine {
 
             }
 
+            List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
+            if (processOutputs != null && !processOutputs.isEmpty()){
+                for (OutputDataObjectType output : processOutputs){
+                    if (output.isOutputStreaming()){
+                        status = new ProcessStatus(ProcessState.EXECUTING);
+                        status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                        processContext.setProcessStatus(status);
+                        GFacUtils.saveAndPublishProcessStatus(processContext);
+                        executeDataStreaming(taskContext, processContext.isRecovery());
+                    }
+                }
+            }
+
             if (processContext.isPauseTaskExecution()) {
                 return;   // If any task put processContext to wait, the same task must continue
processContext execution.
             }
@@ -342,7 +356,6 @@ public class GFacEngineImpl implements GFacEngine {
             taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
             taskContext.setTaskStatus(taskStatus);
             GFacUtils.saveAndPublishTaskStatus(taskContext);
-
             checkFailures(taskContext, taskStatus, jobSubmissionTask);
             return false;
         } catch (TException e) {
@@ -350,6 +363,22 @@ public class GFacEngineImpl implements GFacEngine {
         }
     }
 
+    private void executeDataStreaming(TaskContext taskContext, boolean recovery) throws GFacException
{
+        TaskStatus taskStatus = new TaskStatus(TaskState.EXECUTING);
+        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        taskContext.setTaskStatus(taskStatus);
+        GFacUtils.saveAndPublishTaskStatus(taskContext);
+        try {
+            DataStreamingTask dataStreamingTask = new DataStreamingTask();
+            taskStatus = executeTask(taskContext, dataStreamingTask, recovery);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            taskContext.setTaskStatus(taskStatus);
+            GFacUtils.saveAndPublishTaskStatus(taskContext);
+        } catch (Exception e) {
+            throw new GFacException(e);
+        }
+    }
+
     private boolean configureWorkspace(TaskContext taskContext, boolean recover) throws GFacException
{
 
         try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/37e94a07/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
index fe5f8b7..16f21a2 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
@@ -41,6 +41,8 @@ import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.impl.Factory;
 import org.apache.airavata.gfac.impl.SSHUtils;
 import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
 import org.apache.airavata.model.task.DataStagingTaskModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,7 +77,16 @@ public class StreamData extends TimerTask  {
     @Override
     public void run() {
         try {
-            runOutputStaging();
+            // output staging should start when the job is in active state
+            JobStatus jobStatus = taskContext.getParentProcessContext().getJobModel().getJobStatus();
+            if (jobStatus != null && jobStatus.getJobState().equals(JobState.ACTIVE)){
+                runOutputStaging();
+            }
+
+            // output staging should end when the job is complete
+            if (jobStatus != null && jobStatus.getJobState().equals(JobState.COMPLETE)
|| jobStatus.getJobState().equals(JobState.CANCELED) || jobStatus.getJobState().equals(JobState.FAILED)){
+                this.cancel();
+            }
         } catch (URISyntaxException e) {
             log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Erroneous
path specified",
                     taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
@@ -135,7 +146,6 @@ public class StreamData extends TimerTask  {
             Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
             String targetPath = destinationURI.getPath().substring(0, destinationURI.getPath().lastIndexOf('/'));
             SSHUtils.makeDirectory(targetPath, sshSession);
-            // TODO - save updated subtask model with new destination
             outputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
         } catch (GFacException e) {
             log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Error
while output staging",


Mime
View raw message