airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dimuthu...@apache.org
Subject [airavata] branch staging updated: Refactoring post workflow manager
Date Sat, 10 Nov 2018 05:09:32 GMT
This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch staging
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/staging by this push:
     new f3100aa  Refactoring post workflow manager
f3100aa is described below

commit f3100aa85c220c42c18a74d8022490d1fa5550ae
Author: Dimuthu Wannipurage <dimuthu.wannipurage@datasprouts.com>
AuthorDate: Sat Nov 10 00:09:23 2018 -0500

    Refactoring post workflow manager
---
 .../helix/impl/workflow/PostWorkflowManager.java   | 163 +++++++++++----------
 1 file changed, 84 insertions(+), 79 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 3f86db5..86e86dc 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -140,90 +140,12 @@ public class PostWorkflowManager extends WorkflowManager {
                     if (jobStatusResult.getState() == JobState.COMPLETE || jobStatusResult.getState()
== JobState.FAILED) {
                         // if the job is FAILED, still run output staging tasks to debug
the reason for failure. And update
                         // the experiment status as COMPLETED as this job failure is not
related to Airavata scope.
-
                         logger.info("Starting the post workflow for job id : " + jobStatusResult.getJobId()
+ " with process id "
                                 + processId + ", gateway " + gateway + " and status " + jobStatusResult.getState().name());
 
                         logger.info("Job " + jobStatusResult.getJobId() + " was completed");
 
-                        RegistryService.Client registryClient = getRegistryClientPool().getResource();
-
-                        ProcessModel processModel;
-                        ExperimentModel experimentModel;
-                        try {
-                            processModel = registryClient.getProcess(processId);
-                            experimentModel = registryClient.getExperiment(processModel.getExperimentId());
-                            getRegistryClientPool().returnResource(registryClient);
-
-                        } catch (Exception e) {
-                            logger.error("Failed to fetch experiment or process from registry
associated with process id " + processId, e);
-                            getRegistryClientPool().returnResource(registryClient);
-                            throw new Exception("Failed to fetch experiment or process from
registry associated with process id " + processId, e);
-                        }
-
-                        String taskDag = processModel.getTaskDag();
-                        List<TaskModel> taskList = processModel.getTasks();
-
-                        String[] taskIds = taskDag.split(",");
-                        final List<AiravataTask> allTasks = new ArrayList<>();
-
-                        boolean jobSubmissionFound = false;
-
-                        for (String taskId : taskIds) {
-                            Optional<TaskModel> model = taskList.stream().filter(taskModel
-> taskModel.getTaskId().equals(taskId)).findFirst();
-
-                            if (model.isPresent()) {
-                                TaskModel taskModel = model.get();
-                                AiravataTask airavataTask = null;
-                                if (taskModel.getTaskType() == TaskTypes.JOB_SUBMISSION)
{
-                                    jobSubmissionFound = true;
-                                } else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING)
{
-                                    if (jobSubmissionFound) {
-                                        DataStagingTaskModel subTaskModel = (DataStagingTaskModel)
ThriftUtils.getSubTaskModel(taskModel);
-                                        assert subTaskModel != null;
-                                        switch (subTaskModel.getType()) {
-                                            case OUPUT:
-                                                airavataTask = new OutputDataStagingTask();
-                                                break;
-                                            case ARCHIVE_OUTPUT:
-                                                airavataTask = new ArchiveTask();
-                                                break;
-                                        }
-                                    }
-                                }
-
-                                if (airavataTask != null) {
-                                    airavataTask.setGatewayId(experimentModel.getGatewayId());
-                                    airavataTask.setExperimentId(experimentModel.getExperimentId());
-                                    airavataTask.setProcessId(processModel.getProcessId());
-                                    airavataTask.setTaskId(taskModel.getTaskId());
-                                    if (allTasks.size() > 0) {
-                                        allTasks.get(allTasks.size() - 1).setNextTask(new
OutPort(airavataTask.getTaskId(), airavataTask));
-                                    }
-                                    allTasks.add(airavataTask);
-                                }
-                            }
-                        }
-
-                        CompletingTask completingTask = new CompletingTask();
-                        completingTask.setGatewayId(experimentModel.getGatewayId());
-                        completingTask.setExperimentId(experimentModel.getExperimentId());
-                        completingTask.setProcessId(processModel.getProcessId());
-                        completingTask.setTaskId("Completing-Task");
-                        completingTask.setSkipTaskStatusPublish(true);
-                        if (allTasks.size() > 0) {
-                            allTasks.get(allTasks.size() - 1).setNextTask(new OutPort(completingTask.getTaskId(),
completingTask));
-                        }
-                        allTasks.add(completingTask);
-
-                        String workflowName = getWorkflowOperator().launchWorkflow(processId
+ "-POST-" + UUID.randomUUID().toString(),
-                                new ArrayList<>(allTasks), true, false);
-                        try {
-                            MonitoringUtil.registerWorkflow(getCuratorClient(), processId,
workflowName);
-                        } catch (Exception e) {
-                            logger.error("Failed to save workflow " + workflowName + " of
process " + processId + " in zookeeper registry. " +
-                                    "This will affect cancellation tasks", e);
-                        }
+                        executePostWorkflow(processId, gateway);
 
                     } else if (jobStatusResult.getState() == JobState.CANCELED) {
                         logger.info("Job " + jobStatusResult.getJobId() + " was externally
cancelled but process is not marked as cancelled yet");
@@ -247,6 +169,89 @@ public class PostWorkflowManager extends WorkflowManager {
         }
     }
 
+    private void executePostWorkflow(String processId, String gateway) throws Exception {
+
+        RegistryService.Client registryClient = getRegistryClientPool().getResource();
+
+        ProcessModel processModel;
+        ExperimentModel experimentModel;
+        try {
+            processModel = registryClient.getProcess(processId);
+            experimentModel = registryClient.getExperiment(processModel.getExperimentId());
+            getRegistryClientPool().returnResource(registryClient);
+
+        } catch (Exception e) {
+            logger.error("Failed to fetch experiment or process from registry associated
with process id " + processId, e);
+            getRegistryClientPool().returnResource(registryClient);
+            throw new Exception("Failed to fetch experiment or process from registry associated
with process id " + processId, e);
+        }
+
+        String taskDag = processModel.getTaskDag();
+        List<TaskModel> taskList = processModel.getTasks();
+
+        String[] taskIds = taskDag.split(",");
+        final List<AiravataTask> allTasks = new ArrayList<>();
+
+        boolean jobSubmissionFound = false;
+
+        for (String taskId : taskIds) {
+            Optional<TaskModel> model = taskList.stream().filter(taskModel -> taskModel.getTaskId().equals(taskId)).findFirst();
+
+            if (model.isPresent()) {
+                TaskModel taskModel = model.get();
+                AiravataTask airavataTask = null;
+                if (taskModel.getTaskType() == TaskTypes.JOB_SUBMISSION) {
+                    jobSubmissionFound = true;
+                } else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING) {
+                    if (jobSubmissionFound) {
+                        DataStagingTaskModel subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskModel);
+                        assert subTaskModel != null;
+                        switch (subTaskModel.getType()) {
+                            case OUPUT:
+                                airavataTask = new OutputDataStagingTask();
+                                break;
+                            case ARCHIVE_OUTPUT:
+                                airavataTask = new ArchiveTask();
+                                break;
+                        }
+                    }
+                }
+
+                if (airavataTask != null) {
+                    airavataTask.setGatewayId(experimentModel.getGatewayId());
+                    airavataTask.setExperimentId(experimentModel.getExperimentId());
+                    airavataTask.setProcessId(processModel.getProcessId());
+                    airavataTask.setTaskId(taskModel.getTaskId());
+                    if (allTasks.size() > 0) {
+                        allTasks.get(allTasks.size() - 1).setNextTask(new OutPort(airavataTask.getTaskId(),
airavataTask));
+                    }
+                    allTasks.add(airavataTask);
+                }
+            }
+        }
+
+        CompletingTask completingTask = new CompletingTask();
+        completingTask.setGatewayId(experimentModel.getGatewayId());
+        completingTask.setExperimentId(experimentModel.getExperimentId());
+        completingTask.setProcessId(processModel.getProcessId());
+        completingTask.setTaskId("Completing-Task");
+        completingTask.setSkipTaskStatusPublish(true);
+        if (allTasks.size() > 0) {
+            allTasks.get(allTasks.size() - 1).setNextTask(new OutPort(completingTask.getTaskId(),
completingTask));
+        }
+        allTasks.add(completingTask);
+
+        String workflowName = getWorkflowOperator().launchWorkflow(processId + "-POST-" +
UUID.randomUUID().toString(),
+                new ArrayList<>(allTasks), true, false);
+        try {
+            MonitoringUtil.registerWorkflow(getCuratorClient(), processId, workflowName);
+        } catch (Exception e) {
+            logger.error("Failed to save workflow " + workflowName + " of process " + processId
+ " in zookeeper registry. " +
+                    "This will affect cancellation tasks", e);
+        }
+
+    }
+
     private void runConsumer() throws ApplicationSettingsException {
         final Consumer<String, JobStatusResult> consumer = createConsumer();
 


Mime
View raw message