airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dimuthu...@apache.org
Subject [airavata] branch develop updated: Graceful job cancellation if the Default Job Submission failed in middle
Date Tue, 10 Apr 2018 18:09:56 GMT
This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1b950bd  Graceful job cancellation if the Default Job Submission failed in middle
1b950bd is described below

commit 1b950bdb5b96f046e4fbaac6e7024b158dd86e7a
Author: dimuthu <dimuthu.upeksha2@gmail.com>
AuthorDate: Tue Apr 10 14:09:49 2018 -0400

    Graceful job cancellation if the Default Job Submission failed in middle
---
 .../task/submission/DefaultJobSubmissionTask.java  |  74 +++++++----
 .../impl/task/submission/JobSubmissionTask.java    |  35 ++----
 .../helix/impl/workflow/PostWorkflowManager.java   | 101 ++-------------
 .../airavata/helix/core/util/MonitoringUtil.java   | 137 +++++++++++++++++++++
 .../apache/airavata/helix/core/util/TaskUtil.java  |   4 +
 5 files changed, 214 insertions(+), 137 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
index 84b331b..12e7853 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
@@ -22,6 +22,7 @@ package org.apache.airavata.helix.impl.task.submission;
 import org.apache.airavata.agents.api.AgentAdaptor;
 import org.apache.airavata.agents.api.JobSubmissionOutput;
 import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.helix.core.util.MonitoringUtil;
 import org.apache.airavata.helix.impl.task.TaskContext;
 import org.apache.airavata.helix.impl.task.submission.config.GroovyMapBuilder;
 import org.apache.airavata.helix.impl.task.submission.config.GroovyMapData;
@@ -50,6 +51,20 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
     @Override
     public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) {
 
+        String jobId = null;
+        AgentAdaptor adaptor;
+
+        try {
+            adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+                    getTaskContext().getGatewayId(),
+                    getTaskContext().getComputeResourceId(),
+                    getTaskContext().getJobSubmissionProtocol(),
+                    getTaskContext().getComputeResourceCredentialToken(),
+                    getTaskContext().getComputeResourceLoginUserName());
+        } catch (Exception e) {
+            return onFail("Failed to fetch adaptor to connect to " + getTaskContext().getComputeResourceId(),
true, e);
+        }
+
         try {
             saveAndPublishProcessStatus(ProcessState.EXECUTING);
 
@@ -63,13 +78,6 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
             jobModel.setJobName(mapData.getJobName());
             jobModel.setJobDescription("Sample description");
 
-            AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
-                    getTaskContext().getGatewayId(),
-                    getTaskContext().getComputeResourceId(),
-                    getTaskContext().getJobSubmissionProtocol(),
-                    getTaskContext().getComputeResourceCredentialToken(),
-                    getTaskContext().getComputeResourceLoginUserName());
-
             JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory());
 
             jobModel.setJobDescription(submissionOutput.getDescription());
@@ -77,7 +85,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
             jobModel.setStdErr(submissionOutput.getStdErr());
             jobModel.setStdOut(submissionOutput.getStdOut());
 
-            String jobId = submissionOutput.getJobId();
+            jobId = submissionOutput.getJobId();
 
             if (submissionOutput.getExitCode() != 0 || submissionOutput.isJobSubmissionFailed())
{
 
@@ -136,21 +144,8 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
                     jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                     jobModel.setJobStatuses(Collections.singletonList(jobStatus));
                     saveAndPublishJobStatus(jobModel);
-                    createMonitoringNode(jobId, mapData.getJobName());
-                }
-
-                if (getComputeResourceDescription().isGatewayUsageReporting()){
-                    String loadCommand = getComputeResourceDescription().getGatewayUsageModuleLoadCommand();
-                    String usageExecutable = getComputeResourceDescription().getGatewayUsageExecutable();
-                    ExperimentModel experiment = getRegistryServiceClient().getExperiment(getExperimentId());
-                    String username = experiment.getUserName() + "@" + getTaskContext().getGatewayComputeResourcePreference().getUsageReportingGatewayId();
-                    RawCommandInfo rawCommandInfo = new RawCommandInfo(loadCommand + " &&
" + usageExecutable + " -gateway_user " +  username  +
-                            " -submit_time \"`date '+%F %T %:z'`\"  -jobid " + jobId );
-                    adaptor.executeCommand(rawCommandInfo.getRawCommand(), null);
                 }
 
-                return onSuccess("Submitted job to compute resource");
-
             } else {
 
                 int verificationTryCount = 0;
@@ -183,11 +178,46 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
                         "doesn't return a valid JobId. " + "Hence changing experiment state
to Failed";
                 logger.error(msg);
                 return onFail("Couldn't find job id in both submitted and verified steps.
" + msg, false, null);
+
             } else {
-                return onSuccess("Submitted job to compute resource after retry");
+
+                // creating monitoring nodes
+                MonitoringUtil.createMonitoringNode(getCuratorClient(), jobId, mapData.getJobName(),
getTaskId(),
+                        getProcessId(), getExperimentId(), getGatewayId());
+
+                // usage reporting as the last step of job submission task
+                if (getComputeResourceDescription().isGatewayUsageReporting()){
+                    String loadCommand = getComputeResourceDescription().getGatewayUsageModuleLoadCommand();
+                    String usageExecutable = getComputeResourceDescription().getGatewayUsageExecutable();
+                    ExperimentModel experiment = getRegistryServiceClient().getExperiment(getExperimentId());
+                    String username = experiment.getUserName() + "@" + getTaskContext().getGatewayComputeResourcePreference().getUsageReportingGatewayId();
+                    RawCommandInfo rawCommandInfo = new RawCommandInfo(loadCommand + " &&
" + usageExecutable + " -gateway_user " +  username  +
+                            " -submit_time \"`date '+%F %T %:z'`\"  -jobid " + jobId );
+                    adaptor.executeCommand(rawCommandInfo.getRawCommand(), null);
+                }
+
+                return onSuccess("Submitted job to compute resource");
             }
 
         } catch (Exception e) {
+
+            logger.error("Task failed due to unexpected issue. Trying to control damage");
+
+            if (jobId != null && !jobId.isEmpty()) {
+                logger.warn("Job " + jobId + " has already being submitted. Trying to cancel
the job");
+                try {
+                    boolean cancelled = cancelJob(adaptor, jobId);
+                    if (cancelled) {
+                        logger.info("Job " + jobId + " cancellation triggered");
+                    } else {
+                        logger.error("Failed to cancel job " + jobId + ". Please contact
system admins");
+                    }
+                } catch (Exception e1) {
+                    logger.error("Error while cancelling the job " + jobId + ". Please contact
system admins");
+                    // ignore as we have nothing to do at this point
+                }
+            }
+
             return onFail("Task failed due to unexpected issue", false, e);
         }
     }
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
index 1ae60bb..3409b96 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
@@ -57,28 +57,6 @@ public abstract class JobSubmissionTask extends AiravataTask {
         super.init(manager, workflowName, jobName, taskName);
     }
 
-    // TODO perform exception handling
-    @SuppressWarnings("WeakerAccess")
-    protected void createMonitoringNode(String jobId, String jobName) throws Exception {
-        logger.info("Creating zookeeper paths for job monitoring for job id : " + jobId +
", process : "
-                + getProcessId() + ", gateway : " + getGatewayId());
-        getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                "/monitoring/" + jobId + "/lock", new byte[0]);
-        getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                "/monitoring/" + jobId + "/gateway", getGatewayId().getBytes());
-        getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                "/monitoring/" + jobId + "/process", getProcessId().getBytes());
-        getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                "/monitoring/" + jobId + "/task", getTaskId().getBytes());
-        getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                "/monitoring/" + jobId + "/experiment", getExperimentId().getBytes());
-        getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                "/monitoring/" + jobId + "/jobName", jobName.getBytes());
-        getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                "/monitoring/" + jobName + "/jobId", jobId.getBytes());
-        getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                "/registry/" + getProcessId() + "/jobs/" + jobId, new byte[0]);
-    }
 
     @SuppressWarnings("WeakerAccess")
     protected JobSubmissionOutput submitBatchJob(AgentAdaptor agentAdaptor, GroovyMapData
groovyMapData, String workingDirectory) throws Exception {
@@ -179,13 +157,20 @@ public abstract class JobSubmissionTask extends AiravataTask {
         return new File(outputPath + getProcessId());
     }
 
+    public boolean cancelJob(AgentAdaptor agentAdaptor, String jobId) throws Exception {
+        JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
+                getRegistryServiceClient(), getTaskContext().getJobSubmissionProtocol(),
getTaskContext().getPreferredJobSubmissionInterface()));
+        CommandOutput commandOutput = agentAdaptor.executeCommand(jobManagerConfiguration.getCancelCommand(jobId).getRawCommand(),
null);
+        return commandOutput.getExitCode() == 0;
+    }
+
     @SuppressWarnings("WeakerAccess")
-    public JobStatus getJobStatus(AgentAdaptor agentAdaptor, String jobID) throws Exception
{
+    public JobStatus getJobStatus(AgentAdaptor agentAdaptor, String jobIc) throws Exception
{
         JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
                 getRegistryServiceClient(), getTaskContext().getJobSubmissionProtocol(),
getTaskContext().getPreferredJobSubmissionInterface()));
-        CommandOutput commandOutput = agentAdaptor.executeCommand(jobManagerConfiguration.getMonitorCommand(jobID).getRawCommand(),
null);
+        CommandOutput commandOutput = agentAdaptor.executeCommand(jobManagerConfiguration.getMonitorCommand(jobIc).getRawCommand(),
null);
 
-        return jobManagerConfiguration.getParser().parseJobStatus(jobID, commandOutput.getStdOut());
+        return jobManagerConfiguration.getParser().parseJobStatus(jobIc, commandOutput.getStdOut());
 
     }
 
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 7c85d31..9a8a1f3 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -24,12 +24,12 @@ import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.helix.core.OutPort;
+import org.apache.airavata.helix.core.util.MonitoringUtil;
 import org.apache.airavata.helix.impl.task.*;
 import org.apache.airavata.helix.impl.task.completing.CompletingTask;
 import org.apache.airavata.helix.impl.task.staging.ArchiveTask;
 import org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask;
 import org.apache.airavata.model.status.ProcessState;
-import org.apache.airavata.model.status.ProcessStatus;
 import org.apache.airavata.monitor.JobStateValidator;
 import org.apache.airavata.monitor.JobStatusResult;
 import org.apache.airavata.monitor.kafka.JobStatusResultDeserializer;
@@ -48,8 +48,6 @@ import org.apache.airavata.registry.api.RegistryService;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,84 +79,6 @@ public class PostWorkflowManager extends WorkflowManager {
         return consumer;
     }
 
-    private void registerWorkflow(String processId, String workflowId) throws Exception {
-        getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                "/registry/" + processId + "/workflows/" + workflowId , new byte[0]);
-    }
-
-    private String getExperimentIdByJobId(String jobId) throws Exception {
-        String path = "/monitoring/" + jobId + "/experiment";
-        if (getCuratorClient().checkExists().forPath(path) != null) {
-            byte[] processBytes = getCuratorClient().getData().forPath(path);
-            return new String(processBytes);
-        } else {
-            return null;
-        }
-    }
-
-    private String getTaskIdByJobId(String jobId) throws Exception {
-        String path = "/monitoring/" + jobId + "/task";
-        if (getCuratorClient().checkExists().forPath(path) != null) {
-            byte[] processBytes = getCuratorClient().getData().forPath(path);
-            return new String(processBytes);
-        } else {
-            return null;
-        }
-    }
-
-    private String getProcessIdByJobId(String jobId) throws Exception {
-        String path = "/monitoring/" + jobId + "/process";
-        if (getCuratorClient().checkExists().forPath(path) != null) {
-            byte[] processBytes = getCuratorClient().getData().forPath(path);
-            return new String(processBytes);
-        } else {
-            return null;
-        }
-    }
-
-    private String getGatewayByJobId(String jobId) throws Exception {
-        String path = "/monitoring/" + jobId + "/gateway";
-        if (getCuratorClient().checkExists().forPath(path) != null) {
-            byte[] gatewayBytes = getCuratorClient().getData().forPath(path);
-            return new String(gatewayBytes);
-        } else {
-            return null;
-        }
-    }
-
-    private void updateStatusOfJob(String jobId, JobState jobState) throws Exception {
-        String path = "/monitoring/" + jobId + "/status";
-        if (getCuratorClient().checkExists().forPath(path) != null) {
-            getCuratorClient().delete().forPath(path);
-        }
-        getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,
jobState.name().getBytes());
-    }
-
-    private JobState getCurrentStatusOfJob(String jobId) throws Exception {
-        String path = "/monitoring/" + jobId + "/status";
-        if (getCuratorClient().checkExists().forPath(path) != null) {
-            byte[] gatewayBytes = getCuratorClient().getData().forPath(path);
-            return JobState.valueOf(new String(gatewayBytes));
-        } else {
-            return null;
-        }
-    }
-
-    private String getStatusOfProcess(String processId) throws Exception {
-        String path = "/registry/" + processId + "/status";
-        if (getCuratorClient().checkExists().forPath(path) != null) {
-            byte[] statusBytes = getCuratorClient().getData().forPath(path);
-            return new String(statusBytes);
-        } else {
-            return null;
-        }
-    }
-
-    private boolean hasMonitoringRegistered(String jobId) throws Exception {
-        Stat stat = getCuratorClient().checkExists().forPath("/monitoring/" + jobId);
-        return stat != null;
-    }
-
     private boolean process(JobStatusResult jobStatusResult) {
 
         if (jobStatusResult == null) {
@@ -169,28 +89,28 @@ public class PostWorkflowManager extends WorkflowManager {
         try {
             logger.info("Processing job result of job id " + jobStatusResult.getJobId() +
" sent by " + jobStatusResult.getPublisherName());
 
-            if (hasMonitoringRegistered(jobStatusResult.getJobId())) {
+            if (MonitoringUtil.hasMonitoringRegistered(getCuratorClient(), jobStatusResult.getJobId()))
{
 
-                JobState currentJobStatus = getCurrentStatusOfJob(jobStatusResult.getJobId());
+                JobState currentJobStatus = MonitoringUtil.getCurrentStatusOfJob(getCuratorClient(),
jobStatusResult.getJobId());
                 if (!JobStateValidator.isValid(currentJobStatus, jobStatusResult.getState()))
{
                     logger.warn("Job state of " + jobStatusResult.getJobId() + " is not valid.
Previous state " +
                             currentJobStatus + ", new state " + jobStatusResult.getState());
                     return true;
                 }
 
-                String gateway = Optional.ofNullable(getGatewayByJobId(jobStatusResult.getJobId()))
+                String gateway = Optional.ofNullable(MonitoringUtil.getGatewayByJobId(getCuratorClient(),
jobStatusResult.getJobId()))
                         .orElseThrow(() -> new Exception("Can not find the gateway for
job id " + jobStatusResult.getJobId()));
 
-                String processId = Optional.ofNullable(getProcessIdByJobId(jobStatusResult.getJobId()))
+                String processId = Optional.ofNullable(MonitoringUtil.getProcessIdByJobId(getCuratorClient(),
jobStatusResult.getJobId()))
                         .orElseThrow(() -> new Exception("Can not find the process for
job id " + jobStatusResult.getJobId()));
 
-                String experimentId = Optional.ofNullable(getExperimentIdByJobId(jobStatusResult.getJobId()))
+                String experimentId = Optional.ofNullable(MonitoringUtil.getExperimentIdByJobId(getCuratorClient(),
jobStatusResult.getJobId()))
                         .orElseThrow(() -> new Exception("Can not find the experiment
for job id " + jobStatusResult.getJobId()));
 
-                String task = Optional.ofNullable(getTaskIdByJobId(jobStatusResult.getJobId()))
+                String task = Optional.ofNullable(MonitoringUtil.getTaskIdByJobId(getCuratorClient(),
jobStatusResult.getJobId()))
                         .orElseThrow(() -> new Exception("Can not find the task for job
id " + jobStatusResult.getJobId()));
 
-                String processStatus = getStatusOfProcess(processId);
+                String processStatus = MonitoringUtil.getStatusOfProcess(getCuratorClient(),
processId);
 
                 logger.info("Updating the job status for job id : " + jobStatusResult.getJobId()
+ " with process id "
                         + processId + ", exp id " + experimentId + ", gateway " + gateway
+ " and status " + jobStatusResult.getState().name());
@@ -258,6 +178,7 @@ public class PostWorkflowManager extends WorkflowManager {
                                 } else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING)
{
                                     if (jobSubmissionFound) {
                                         DataStagingTaskModel subTaskModel = (DataStagingTaskModel)
ThriftUtils.getSubTaskModel(taskModel);
+                                        assert subTaskModel != null;
                                         switch (subTaskModel.getType()) {
                                             case OUPUT:
                                                 airavataTask = new OutputDataStagingTask();
@@ -296,7 +217,7 @@ public class PostWorkflowManager extends WorkflowManager {
                         String workflowName = getWorkflowOperator().launchWorkflow(processId
+ "-POST-" + UUID.randomUUID().toString(),
                                 new ArrayList<>(allTasks), true, false);
                         try {
-                            registerWorkflow(processId, workflowName);
+                            MonitoringUtil.registerWorkflow(getCuratorClient(), processId,
workflowName);
                         } catch (Exception e) {
                             logger.error("Failed to save workflow " + workflowName + " of
process " + processId + " in zookeeper registry. " +
                                     "This will affect cancellation tasks", e);
@@ -379,7 +300,7 @@ public class PostWorkflowManager extends WorkflowManager {
             msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
             getStatusPublisher().publish(msgCtx);
 
-            updateStatusOfJob(jobId, jobState);
+            MonitoringUtil.updateStatusOfJob(getCuratorClient(), jobId, jobState);
         } catch (Exception e) {
             throw new Exception("Error persisting job status " + e.getLocalizedMessage(),
e);
         }
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
new file mode 100644
index 0000000..1a77ad0
--- /dev/null
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
@@ -0,0 +1,137 @@
+package org.apache.airavata.helix.core.util;
+
+import org.apache.airavata.model.status.JobState;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MonitoringUtil {
+
+    private final static Logger logger = LoggerFactory.getLogger(MonitoringUtil.class);
+
+    private static final String MONITORING = "/monitoring/";
+    private static final String REGISTRY = "/registry/";
+
+    private static final String EXPERIMENT = "/experiment";
+    private static final String TASK = "/task";
+    private static final String PROCESS = "/process";
+    private static final String GATEWAY = "/gateway";
+    private static final String LOCK = "/lock";
+
+    private static final String STATUS = "/status";
+    private static final String JOBS = "/jobs";
+    private static final String JOB_ID = "/jobId";
+    private static final String JOB_NAME = "/jobName";
+    private static final String WORKFLOWS = "/workflows";
+
+
+    public static String getExperimentIdByJobId(CuratorFramework curatorClient, String jobId)
throws Exception {
+        String path = MONITORING + jobId + EXPERIMENT;
+        if (curatorClient.checkExists().forPath(path) != null) {
+            byte[] processBytes = curatorClient.getData().forPath(path);
+            return new String(processBytes);
+        } else {
+            return null;
+        }
+    }
+
+    public static String getTaskIdByJobId(CuratorFramework curatorClient, String jobId) throws
Exception {
+        String path = MONITORING + jobId + TASK;
+        if (curatorClient.checkExists().forPath(path) != null) {
+            byte[] processBytes = curatorClient.getData().forPath(path);
+            return new String(processBytes);
+        } else {
+            return null;
+        }
+    }
+
+    public static String getProcessIdByJobId(CuratorFramework curatorClient, String jobId)
throws Exception {
+        String path = MONITORING + jobId + PROCESS;
+        if (curatorClient.checkExists().forPath(path) != null) {
+            byte[] processBytes = curatorClient.getData().forPath(path);
+            return new String(processBytes);
+        } else {
+            return null;
+        }
+    }
+
+    public static String getGatewayByJobId(CuratorFramework curatorClient, String jobId)
throws Exception {
+        String path = MONITORING + jobId + GATEWAY;
+        if (curatorClient.checkExists().forPath(path) != null) {
+            byte[] gatewayBytes = curatorClient.getData().forPath(path);
+            return new String(gatewayBytes);
+        } else {
+            return null;
+        }
+    }
+
+    public static void updateStatusOfJob(CuratorFramework curatorClient, String jobId, JobState
jobState) throws Exception {
+        String path = MONITORING + jobId + STATUS;
+        if (curatorClient.checkExists().forPath(path) != null) {
+            curatorClient.delete().forPath(path);
+        }
+        curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,
jobState.name().getBytes());
+    }
+
+    public static JobState getCurrentStatusOfJob(CuratorFramework curatorClient, String jobId)
throws Exception {
+        String path = MONITORING + jobId + STATUS;
+        if (curatorClient.checkExists().forPath(path) != null) {
+            byte[] gatewayBytes = curatorClient.getData().forPath(path);
+            return JobState.valueOf(new String(gatewayBytes));
+        } else {
+            return null;
+        }
+    }
+
+    public static boolean hasMonitoringRegistered(CuratorFramework curatorClient, String
jobId) throws Exception {
+        Stat stat = curatorClient.checkExists().forPath(MONITORING + jobId);
+        return stat != null;
+    }
+
+    // TODO perform exception handling
+    @SuppressWarnings("WeakerAccess")
+    public static void createMonitoringNode(CuratorFramework curatorClient, String jobId,
String jobName, String taskId, String processId,
+                                            String experimentId, String gateway) throws Exception
{
+        logger.info("Creating zookeeper paths for job monitoring for job id : " + jobId +
", process : "
+                + processId + ", gateway : " + gateway);
+
+        curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+                MONITORING + jobId + LOCK, new byte[0]);
+        curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+                MONITORING + jobId + GATEWAY, gateway.getBytes());
+        curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+                MONITORING + jobId + PROCESS, processId.getBytes());
+        curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+                MONITORING + jobId + TASK, taskId.getBytes());
+        curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+                MONITORING + jobId + EXPERIMENT, experimentId.getBytes());
+        curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+                MONITORING + jobId + JOB_NAME, jobName.getBytes());
+        curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+                MONITORING + jobName + JOB_ID, jobId.getBytes());
+        curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+                REGISTRY + processId + JOBS + "/" + jobId, new byte[0]);
+    }
+
+    public static void deleteMonitoringNode(CuratorFramework curatorClient, String jobId)
{
+        logger.info("Deleting zookeeper paths in job monitoring for job id : " + jobId);
+
+    }
+
+    public static void registerWorkflow(CuratorFramework curatorClient, String processId,
String workflowId) throws Exception {
+        curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+                REGISTRY + processId + WORKFLOWS + "/" + workflowId , new byte[0]);
+    }
+
+    public static String getStatusOfProcess(CuratorFramework curatorClient, String processId)
throws Exception {
+        String path = REGISTRY + processId + STATUS;
+        if (curatorClient.checkExists().forPath(path) != null) {
+            byte[] statusBytes = curatorClient.getData().forPath(path);
+            return new String(statusBytes);
+        } else {
+            return null;
+        }
+    }
+}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
index 4514f3a..52f4069 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
@@ -23,6 +23,8 @@ import org.apache.airavata.helix.core.AbstractTask;
 import org.apache.airavata.helix.core.OutPort;
 import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
 import org.apache.airavata.helix.task.api.annotation.TaskParam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Field;
 import java.util.*;
@@ -35,6 +37,8 @@ import java.util.*;
  */
 public class TaskUtil {
 
+    private final static Logger logger = LoggerFactory.getLogger(TaskUtil.class);
+
     public static <T extends AbstractTask> List<OutPort> getOutPortsOfTask(T
taskObj) throws IllegalAccessException {
 
         List<OutPort> outPorts = new ArrayList<>();

-- 
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.

Mime
View raw message