airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From goshe...@apache.org
Subject airavata git commit: Add DefaultJobSubmission Task impl
Date Thu, 13 Apr 2017 16:32:42 GMT
Repository: airavata
Updated Branches:
  refs/heads/feature-workload-mgmt aeaf35b46 -> 30e1f0ee8


Add DefaultJobSubmission Task impl


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/30e1f0ee
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/30e1f0ee
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/30e1f0ee

Branch: refs/heads/feature-workload-mgmt
Commit: 30e1f0ee80efb8679d72914ffb70f80d0791623f
Parents: aeaf35b
Author: Gourav Shenoy <goshenoy@apache.org>
Authored: Thu Apr 13 12:32:36 2017 -0400
Committer: Gourav Shenoy <goshenoy@apache.org>
Committed: Thu Apr 13 12:32:36 2017 -0400

----------------------------------------------------------------------
 modules/worker/pom.xml                          |   4 +-
 modules/worker/task-jobsubmission/pom.xml       |  10 +
 .../impl/DefaultJobSubmissionTask.java          | 115 ++--
 .../task/jobsubmission/utils/GroovyMap.java     | 111 ++++
 .../jobsubmission/utils/JobSubmissionUtils.java | 602 +++++++++++++++++++
 .../worker/task/jobsubmission/utils/Script.java |  79 +++
 6 files changed, 864 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/30e1f0ee/modules/worker/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/pom.xml b/modules/worker/pom.xml
index 3579963..e2681b8 100644
--- a/modules/worker/pom.xml
+++ b/modules/worker/pom.xml
@@ -24,12 +24,12 @@
                 <activeByDefault>true</activeByDefault>
             </activation>
             <modules>
-                <module>task-env-setup</module>
+                <module>task-envsetup</module>
             </modules>
         </profile>
     </profiles>
     <modules>
-        <module>task-job-submission</module>
+        <module>task-jobsubmission</module>
     </modules>
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

http://git-wip-us.apache.org/repos/asf/airavata/blob/30e1f0ee/modules/worker/task-jobsubmission/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/pom.xml b/modules/worker/task-jobsubmission/pom.xml
index 600512e..dfdf734 100644
--- a/modules/worker/task-jobsubmission/pom.xml
+++ b/modules/worker/task-jobsubmission/pom.xml
@@ -26,6 +26,16 @@
             <artifactId>airavata-data-models</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.codehaus.groovy</groupId>
+            <artifactId>groovy</artifactId>
+            <version>${groovy.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.groovy</groupId>
+            <artifactId>groovy-templates</artifactId>
+            <version>${groovy.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/30e1f0ee/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java
index beeb66e..a4cf5c7 100644
--- a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java
@@ -19,19 +19,10 @@
  *
 */
 
-package org.apache.airavata.gfac.impl.task;
+package org.apache.airavata.worker.task.jobsubmission.impl;
 
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.gfac.core.*;
-import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
-import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
-import org.apache.airavata.gfac.core.cluster.RemoteCluster;
-import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.gfac.core.context.TaskContext;
-import org.apache.airavata.gfac.core.task.JobSubmissionTask;
-import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.Factory;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
 import org.apache.airavata.model.commons.ErrorModel;
@@ -41,6 +32,19 @@ import org.apache.airavata.model.status.*;
 import org.apache.airavata.model.task.TaskTypes;
 import org.apache.airavata.registry.cpi.AppCatalogException;
 import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.worker.commons.cluster.JobSubmissionOutput;
+import org.apache.airavata.worker.commons.cluster.RawCommandInfo;
+import org.apache.airavata.worker.commons.cluster.RemoteCluster;
+import org.apache.airavata.worker.commons.context.ProcessContext;
+import org.apache.airavata.worker.commons.context.TaskContext;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.apache.airavata.worker.commons.task.TaskException;
+import org.apache.airavata.worker.commons.utils.JobManagerConfiguration;
+import org.apache.airavata.worker.commons.utils.WorkerUtils;
+import org.apache.airavata.worker.task.jobsubmission.JobSubmissionTask;
+import org.apache.airavata.worker.task.jobsubmission.utils.GroovyMap;
+import org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionUtils;
+import org.apache.airavata.worker.task.jobsubmission.utils.Script;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,7 +57,8 @@ import java.util.List;
 import java.util.Map;
 
 public class DefaultJobSubmissionTask implements JobSubmissionTask {
-	private static final Logger log = LoggerFactory.getLogger(DefaultJobSubmissionTask.class);
+
+	private static final Logger logger = LoggerFactory.getLogger(DefaultJobSubmissionTask.class);
 	public static final String DEFAULT_JOB_ID = "DEFAULT_JOB_ID";
 	private static int waitForProcessIdmillis = 5000;
 	private static int pauseTimeInSec = waitForProcessIdmillis / 1000;
@@ -71,16 +76,16 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 		    JobModel jobModel = processContext.getJobModel();
 		    jobModel.setTaskId(taskContext.getTaskId());
 		    RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
-			GroovyMap groovyMap = GFacUtils.createGroovyMap(processContext, taskContext);
+			GroovyMap groovyMap = JobSubmissionUtils.createGroovyMap(processContext, taskContext);
 			groovyMap.getStringValue(Script.JOB_NAME).
 					ifPresent(jobName -> jobModel.setJobName(jobName));
-			ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
+			ResourceJobManager resourceJobManager = JobSubmissionUtils.getResourceJobManager(processContext);
 		    JobManagerConfiguration jConfig = null;
 		    if (resourceJobManager != null) {
-			    jConfig = Factory.getJobManagerConfiguration(resourceJobManager);
+			    jConfig = JobSubmissionUtils.getJobManagerConfiguration(resourceJobManager);
 		    }
 		    JobStatus jobStatus = new JobStatus();
-		    File jobFile = GFacUtils.createJobFile(groovyMap, taskContext, jConfig);
+		    File jobFile = JobSubmissionUtils.createJobFile(groovyMap, taskContext, jConfig);
 		    if (jobFile != null && jobFile.exists()) {
 			    jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
 			    JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(),
@@ -98,22 +103,22 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 						statusList.add(new JobStatus(JobState.FAILED));
 						statusList.get(0).setReason(jobSubmissionOutput.getFailureReason());
 						jobModel.setJobStatuses(statusList);
-						GFacUtils.saveJobModel(processContext, jobModel);
-						log.error("expId: {}, processid: {}, taskId: {} :- Job submission failed for job name {}",
+						JobSubmissionUtils.saveJobModel(processContext, jobModel);
+						logger.error("expId: {}, processid: {}, taskId: {} :- Job submission failed for job name {}",
                                 experimentId, taskContext.getProcessId(), taskContext.getTaskId(), jobModel.getJobName());
 						ErrorModel errorModel = new ErrorModel();
 						errorModel.setUserFriendlyMessage(jobSubmissionOutput.getFailureReason());
 						errorModel.setActualErrorMessage(jobSubmissionOutput.getFailureReason());
-						GFacUtils.saveExperimentError(processContext, errorModel);
-						GFacUtils.saveProcessError(processContext, errorModel);
-						GFacUtils.saveTaskError(taskContext, errorModel);
+						WorkerUtils.saveExperimentError(processContext, errorModel);
+						WorkerUtils.saveProcessError(processContext, errorModel);
+						WorkerUtils.saveTaskError(taskContext, errorModel);
 						taskStatus.setState(TaskState.FAILED);
 						taskStatus.setReason("Job submission command didn't return a jobId");
 						taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
 						taskContext.setTaskStatus(taskStatus);
 					} else {
 						String msg;
-						GFacUtils.saveJobModel(processContext, jobModel);
+						JobSubmissionUtils.saveJobModel(processContext, jobModel);
 						ErrorModel errorModel = new ErrorModel();
 						if (exitCode != Integer.MIN_VALUE) {
 							msg = "expId:" + processContext.getProcessModel().getExperimentId() + ", processId:" +
@@ -130,38 +135,38 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 									" stderr -> " + jobSubmissionOutput.getStdErr() + " Hence changing job state to Failed." ;
 							errorModel.setActualErrorMessage(msg);
 						}
-						log.error(msg);
+						logger.error(msg);
 						errorModel.setUserFriendlyMessage(msg);
-						GFacUtils.saveExperimentError(processContext, errorModel);
-						GFacUtils.saveProcessError(processContext, errorModel);
-						GFacUtils.saveTaskError(taskContext, errorModel);
+						WorkerUtils.saveExperimentError(processContext, errorModel);
+						WorkerUtils.saveProcessError(processContext, errorModel);
+						WorkerUtils.saveTaskError(taskContext, errorModel);
 						taskStatus.setState(TaskState.FAILED);
 						taskStatus.setReason(msg);
 						taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
 						taskContext.setTaskStatus(taskStatus);
 					}
 					try {
-						GFacUtils.saveAndPublishTaskStatus(taskContext);
-					} catch (GFacException e) {
-						log.error("Error while saving task status", e);
+						WorkerUtils.saveAndPublishTaskStatus(taskContext);
+					} catch (WorkerException e) {
+						logger.error("Error while saving task status", e);
 					}
 					return taskStatus;
 				} else if (jobId != null && !jobId.isEmpty()) {
 				    jobModel.setJobId(jobId);
-				    GFacUtils.saveJobModel(processContext, jobModel);
+					JobSubmissionUtils.saveJobModel(processContext, jobModel);
 				    jobStatus.setJobState(JobState.SUBMITTED);
                     ComputeResourceDescription computeResourceDescription = taskContext.getParentProcessContext()
                             .getComputeResourceDescription();
                     jobStatus.setReason("Successfully Submitted to " + computeResourceDescription.getHostName());
                     jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
 				    jobModel.setJobStatuses(Arrays.asList(jobStatus));
-				    GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+					WorkerUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
 				    if (verifyJobSubmissionByJobId(remoteCluster, jobId)) {
 					    jobStatus.setJobState(JobState.QUEUED);
 					    jobStatus.setReason("Verification step succeeded");
                         jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
 					    jobModel.setJobStatuses(Arrays.asList(jobStatus));
-					    GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+						WorkerUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
 				    }
                     // doing gateway reporting
                     if (computeResourceDescription.isGatewayUsageReporting()){
@@ -185,40 +190,40 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 							// JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
 							jobId = verifyJobId;
 							jobModel.setJobId(jobId);
-							GFacUtils.saveJobModel(processContext, jobModel);
+							JobSubmissionUtils.saveJobModel(processContext, jobModel);
 							jobStatus.setJobState(JobState.QUEUED);
 							jobStatus.setReason("Verification step succeeded");
 							jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
 							jobModel.setJobStatuses(Arrays.asList(jobStatus));
-							GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+							WorkerUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
 							taskStatus.setState(TaskState.COMPLETED);
 							taskStatus.setReason("Submitted job to compute resource");
 							taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
 							break;
 						}
-						log.info("Verify step return invalid jobId, retry verification step in {} secs", verificationTryCount * 10);
+						logger.info("Verify step return invalid jobId, retry verification step in {} secs", verificationTryCount * 10);
 						Thread.sleep(verificationTryCount * 10000);
 					}
 				}
 
 			    if (jobId == null || jobId.isEmpty()) {
 					jobModel.setJobId(DEFAULT_JOB_ID);
-					GFacUtils.saveJobModel(processContext, jobModel);
+					JobSubmissionUtils.saveJobModel(processContext, jobModel);
 					String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find " +
 						    "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " +
 						    "doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
-				    log.error(msg);
+				    logger.error(msg);
                     ErrorModel errorModel = new ErrorModel();
                     errorModel.setUserFriendlyMessage(msg);
                     errorModel.setActualErrorMessage(msg);
-				    GFacUtils.saveExperimentError(processContext, errorModel);
-                    GFacUtils.saveProcessError(processContext, errorModel);
-                    GFacUtils.saveTaskError(taskContext, errorModel);
+					WorkerUtils.saveExperimentError(processContext, errorModel);
+					WorkerUtils.saveProcessError(processContext, errorModel);
+					WorkerUtils.saveTaskError(taskContext, errorModel);
 				    taskStatus.setState(TaskState.FAILED);
 				    taskStatus.setReason("Couldn't find job id in both submitted and verified steps");
                     taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
 			    }else {
-                    GFacUtils.saveJobModel(processContext, jobModel);
+					JobSubmissionUtils.saveJobModel(processContext, jobModel);
                 }
 		    } else {
 			    taskStatus.setState(TaskState.FAILED);
@@ -231,7 +236,7 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 
 	    } catch (AppCatalogException e) {
 		    String msg = "Error while instantiating app catalog";
-		    log.error(msg, e);
+		    logger.error(msg, e);
 		    taskStatus.setState(TaskState.FAILED);
 		    taskStatus.setReason(msg);
             taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
@@ -241,7 +246,7 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 		    taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
 	    } catch (ApplicationSettingsException e) {
 		    String msg = "Error occurred while creating job descriptor";
-		    log.error(msg, e);
+		    logger.error(msg, e);
 		    taskStatus.setState(TaskState.FAILED);
 		    taskStatus.setReason(msg);
             taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
@@ -249,9 +254,9 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 		    errorModel.setActualErrorMessage(e.getMessage());
 		    errorModel.setUserFriendlyMessage(msg);
 		    taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
-	    } catch (GFacException e) {
+	    } catch (WorkerException e) {
 		    String msg = "Error occurred while submitting the job";
-		    log.error(msg, e);
+		    logger.error(msg, e);
 		    taskStatus.setState(TaskState.FAILED);
 		    taskStatus.setReason(msg);
             taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
@@ -261,7 +266,7 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 		    taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
 	    } catch (IOException e) {
 		    String msg = "Error while reading the content of the job file";
-		    log.error(msg, e);
+		    logger.error(msg, e);
 		    taskStatus.setState(TaskState.FAILED);
 		    taskStatus.setReason(msg);
             taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
@@ -271,7 +276,7 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 		    taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
 	    } catch (InterruptedException e) {
 		    String msg = "Error occurred while verifying the job submission";
-		    log.error(msg, e);
+		    logger.error(msg, e);
 		    taskStatus.setState(TaskState.FAILED);
 		    taskStatus.setReason(msg);
             taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
@@ -281,7 +286,7 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 		    taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
 		} catch (Throwable e) {
 			String msg = "JobSubmission failed";
-			log.error(msg, e);
+			logger.error(msg, e);
 			taskStatus.setState(TaskState.FAILED);
 			taskStatus.setReason(msg);
 			taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
@@ -293,14 +298,14 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 
         taskContext.setTaskStatus(taskStatus);
 	    try {
-		    GFacUtils.saveAndPublishTaskStatus(taskContext);
-	    } catch (GFacException e) {
-		    log.error("Error while saving task status", e);
+			WorkerUtils.saveAndPublishTaskStatus(taskContext);
+	    } catch (WorkerException e) {
+		    logger.error("Error while saving task status", e);
 	    }
 	    return taskStatus;
     }
 
-    private boolean verifyJobSubmissionByJobId(RemoteCluster remoteCluster, String jobID) throws GFacException {
+    private boolean verifyJobSubmissionByJobId(RemoteCluster remoteCluster, String jobID) throws WorkerException {
         JobStatus status = remoteCluster.getJobStatus(jobID);
         return status != null &&  status.getJobState() != JobState.UNKNOWN;
     }
@@ -310,8 +315,8 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
         String jobId = null;
         try {
             jobId  = remoteCluster.getJobIdByJobName(jobName, remoteCluster.getServerInfo().getUserName());
-        } catch (GFacException e) {
-            log.error("Error while verifying JobId from JobName");
+        } catch (WorkerException e) {
+            logger.error("Error while verifying JobId from JobName");
         }
         return jobId;
     }
@@ -344,7 +349,7 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 		if (jobModel != null) {
 			if (processContext.getProcessState() == ProcessState.EXECUTING) {
 				while (jobModel.getJobId() == null) {
-					log.info("Cancellation pause {} secs until process get jobId", pauseTimeInSec);
+					logger.info("Cancellation pause {} secs until process get jobId", pauseTimeInSec);
 					try {
 						Thread.sleep(waitForProcessIdmillis);
 					} catch (InterruptedException e) {
@@ -367,7 +372,7 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 					throw new TaskException("Cancel operation failed, Job status couldn't find in resource, JobId " +
 							jobModel.getJobId());
 				}
-			} catch ( GFacException | InterruptedException e) {
+			} catch ( WorkerException | InterruptedException e) {
 				throw new TaskException("Error while cancelling job " + jobModel.getJobId(), e);
 			}
 		} else {

http://git-wip-us.apache.org/repos/asf/airavata/blob/30e1f0ee/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/GroovyMap.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/GroovyMap.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/GroovyMap.java
new file mode 100644
index 0000000..a5971d1
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/GroovyMap.java
@@ -0,0 +1,111 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.task.jobsubmission.utils;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.util.HashMap;
+import java.util.Optional;
+
+public class GroovyMap extends HashMap<String, Object> {
+
+
+    public GroovyMap() {
+        super();
+        // to mitigate groovy exception groovy.lang.MissingPropertyException: No such property: <name> for class: groovy.lang.Binding
+        addDefaultValues();
+    }
+
+    public GroovyMap add(Script name, Object value){
+        put(name.name, value);
+        return this;
+    }
+
+    @Override
+    public Object get(Object key) {
+        return super.getOrDefault(key, null);
+    }
+
+    public Object get(Script script) {
+        return get(script.name);
+    }
+
+    public Optional<String> getStringValue(Script script) {
+        Object obj = get(script);
+        if (obj instanceof String) {
+            return Optional.of((String) obj);
+        } else if (obj == null) {
+            return Optional.empty();
+        } else {
+            throw new IllegalArgumentException("Value is not String type");
+        }
+    }
+
+    private void addDefaultValues() {
+        this.add(Script.SHELL_NAME, null)
+                .add(Script.QUEUE_NAME, null)
+                .add(Script.NODES, null)
+                .add(Script.CPU_COUNT, null)
+                .add(Script.MAIL_ADDRESS, null)
+                .add(Script.ACCOUNT_STRING, null)
+                .add(Script.MAX_WALL_TIME, null)
+                .add(Script.JOB_NAME, null)
+                .add(Script.STANDARD_OUT_FILE, null)
+                .add(Script.STANDARD_ERROR_FILE, null)
+                .add(Script.QUALITY_OF_SERVICE, null)
+                .add(Script.RESERVATION, null)
+                .add(Script.EXPORTS, null)
+                .add(Script.MODULE_COMMANDS, null)
+                .add(Script.SCRATCH_LOCATION, null)
+                .add(Script.WORKING_DIR, null)
+                .add(Script.PRE_JOB_COMMANDS, null)
+                .add(Script.JOB_SUBMITTER_COMMAND, null)
+                .add(Script.EXECUTABLE_PATH, null)
+                .add(Script.INPUTS, null)
+                .add(Script.POST_JOB_COMMANDS, null)
+                .add(Script.USED_MEM, null)
+                .add(Script.PROCESS_PER_NODE, null)
+                .add(Script.CHASSIS_NAME, null)
+                .add(Script.INPUT_DIR, null)
+                .add(Script.OUTPUT_DIR, null)
+                .add(Script.USER_NAME, null)
+                .add(Script.GATEWAY_ID, null)
+                .add(Script.GATEWAY_USER_NAME, null)
+                .add(Script.APPLICATION_NAME, null);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/30e1f0ee/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
index 360464e..6e3ebbc 100644
--- a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
@@ -1,7 +1,609 @@
 package org.apache.airavata.worker.task.jobsubmission.utils;
 
+import groovy.lang.Writable;
+import groovy.text.GStringTemplateEngine;
+import groovy.text.TemplateEngine;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ApplicationSettings;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appdeployment.CommandObject;
+import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.parallelism.ApplicationParallelismType;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
+import org.apache.airavata.model.task.JobSubmissionTaskModel;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.worker.commons.cluster.OutputParser;
+import org.apache.airavata.worker.commons.config.ResourceConfig;
+import org.apache.airavata.worker.commons.context.ProcessContext;
+import org.apache.airavata.worker.commons.context.TaskContext;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.apache.airavata.worker.commons.utils.JobManagerConfiguration;
+import org.apache.airavata.worker.commons.utils.WorkerConstants;
+import org.apache.airavata.worker.commons.utils.WorkerUtils;
+import org.apache.airavata.worker.task.jobsubmission.config.*;
+import org.apache.commons.io.FileUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
 /**
  * Created by goshenoy on 4/12/17.
  */
 public class JobSubmissionUtils {
+
+    private static final Logger log = LoggerFactory.getLogger(JobSubmissionUtils.class);
+    private static Map<ResourceJobManagerType, ResourceConfig> resources = new HashMap<>();
+
+    public static GroovyMap createGroovyMap(ProcessContext processContext, TaskContext taskContext)
+            throws WorkerException, AppCatalogException, ApplicationSettingsException {
+
+        GroovyMap groovyMap = new GroovyMap();
+        ProcessModel processModel = processContext.getProcessModel();
+        ResourceJobManager resourceJobManager = getResourceJobManager(processContext);
+        setMailAddresses(processContext, groovyMap); // set email options and addresses
+
+        groovyMap.add(Script.INPUT_DIR, processContext.getInputDir());
+        groovyMap.add(Script.OUTPUT_DIR, processContext.getOutputDir());
+        groovyMap.add(Script.EXECUTABLE_PATH, processContext.getApplicationDeploymentDescription().getExecutablePath());
+        groovyMap.add(Script.STANDARD_OUT_FILE, processContext.getStdoutLocation());
+        groovyMap.add(Script.STANDARD_ERROR_FILE, processContext.getStderrLocation());
+        groovyMap.add(Script.SCRATCH_LOCATION, processContext.getScratchLocation());
+        groovyMap.add(Script.GATEWAY_ID, processContext.getGatewayId());
+        groovyMap.add(Script.GATEWAY_USER_NAME, processContext.getProcessModel().getUserName());
+        groovyMap.add(Script.APPLICATION_NAME, processContext.getApplicationInterfaceDescription().getApplicationName());
+
+        groovyMap.add(Script.ACCOUNT_STRING, processContext.getAllocationProjectNumber());
+        groovyMap.add(Script.RESERVATION, processContext.getReservation());
+
+        // To make job name alpha numeric
+        groovyMap.add(Script.JOB_NAME, "A" + String.valueOf(generateJobName()));
+        groovyMap.add(Script.WORKING_DIR, processContext.getWorkingDir());
+
+        List<String> inputValues = getProcessInputValues(processModel.getProcessInputs());
+        inputValues.addAll(getProcessOutputValues(processModel.getProcessOutputs()));
+        groovyMap.add(Script.INPUTS, inputValues);
+
+        groovyMap.add(Script.USER_NAME, processContext.getJobSubmissionRemoteCluster().getServerInfo().getUserName());
+        groovyMap.add(Script.SHELL_NAME, "/bin/bash");
+        // get walltime
+        if (taskContext != null) {
+            try {
+                JobSubmissionTaskModel jobSubmissionTaskModel = ((JobSubmissionTaskModel) taskContext.getSubTaskModel());
+                if (jobSubmissionTaskModel.getWallTime() > 0) {
+                    groovyMap.add(Script.MAX_WALL_TIME,
+                            JobSubmissionUtils.maxWallTimeCalculator(jobSubmissionTaskModel.getWallTime()));
+                }
+            } catch (TException e) {
+                log.error("Error while getting job submission sub task model", e);
+            }
+        }
+
+        // NOTE: Give precedence to data comes with experiment
+        // qos per queue
+        String qoS = getQoS(processContext.getQualityOfService(), processContext.getQueueName());
+        if (qoS != null) {
+            groovyMap.add(Script.QUALITY_OF_SERVICE, qoS);
+        }
+        ComputationalResourceSchedulingModel scheduling = processModel.getProcessResourceSchedule();
+        if (scheduling != null) {
+            int totalNodeCount = scheduling.getNodeCount();
+            int totalCPUCount = scheduling.getTotalCPUCount();
+
+            if (isValid(scheduling.getQueueName())) {
+                groovyMap.add(Script.QUEUE_NAME, scheduling.getQueueName());
+            }
+            if (totalNodeCount > 0) {
+                groovyMap.add(Script.NODES, totalNodeCount);
+            }
+            if (totalCPUCount > 0) {
+                int ppn = totalCPUCount / totalNodeCount;
+                groovyMap.add(Script.PROCESS_PER_NODE, ppn);
+                groovyMap.add(Script.CPU_COUNT, totalCPUCount);
+            }
+            // max wall time may be set before this level if jobsubmission task has wall time configured to this job,
+            // if so we ignore scheduling configuration.
+            if (scheduling.getWallTimeLimit() > 0 && groovyMap.get(Script.MAX_WALL_TIME) == null) {
+                groovyMap.add(Script.MAX_WALL_TIME,
+                        JobSubmissionUtils.maxWallTimeCalculator(scheduling.getWallTimeLimit()));
+                if (resourceJobManager != null) {
+                    if (resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)) {
+                        groovyMap.add(Script.MAX_WALL_TIME,
+                                JobSubmissionUtils.maxWallTimeCalculator(scheduling.getWallTimeLimit()));
+                    }
+                }
+            }
+            if (scheduling.getTotalPhysicalMemory() > 0) {
+                groovyMap.add(Script.USED_MEM, scheduling.getTotalPhysicalMemory());
+            }
+            if (isValid(scheduling.getOverrideLoginUserName())) {
+                groovyMap.add(Script.USER_NAME, scheduling.getOverrideLoginUserName());
+            }
+            if (isValid(scheduling.getOverrideAllocationProjectNumber())) {
+                groovyMap.add(Script.ACCOUNT_STRING, scheduling.getOverrideAllocationProjectNumber());
+            }
+            if (isValid(scheduling.getStaticWorkingDir())) {
+                groovyMap.add(Script.WORKING_DIR, scheduling.getStaticWorkingDir());
+            }
+        } else {
+            log.error("Task scheduling cannot be null at this point..");
+        }
+
+        ApplicationDeploymentDescription appDepDescription = processContext.getApplicationDeploymentDescription();
+        List<CommandObject> moduleCmds = appDepDescription.getModuleLoadCmds();
+        if (moduleCmds != null) {
+            List<String> modulesCmdCollect = moduleCmds.stream()
+                    .sorted((e1, e2) -> e1.getCommandOrder() - e2.getCommandOrder())
+                    .map(map -> map.getCommand())
+                    .collect(Collectors.toList());
+            groovyMap.add(Script.MODULE_COMMANDS, modulesCmdCollect);
+        }
+
+        List<CommandObject> preJobCommands = appDepDescription.getPreJobCommands();
+        if (preJobCommands != null) {
+            List<String> preJobCmdCollect = preJobCommands.stream()
+                    .sorted((e1, e2) -> e1.getCommandOrder() - e2.getCommandOrder())
+                    .map(map -> parseCommands(map.getCommand(), groovyMap))
+                    .collect(Collectors.toList());
+            groovyMap.add(Script.PRE_JOB_COMMANDS, preJobCmdCollect);
+        }
+
+        List<CommandObject> postJobCommands = appDepDescription.getPostJobCommands();
+        if (postJobCommands != null) {
+            List<String> postJobCmdCollect = postJobCommands.stream()
+                    .sorted((e1, e2) -> e1.getCommandOrder() - e2.getCommandOrder())
+                    .map(map -> parseCommands(map.getCommand(), groovyMap))
+                    .collect(Collectors.toList());
+            groovyMap.add(Script.POST_JOB_COMMANDS, postJobCmdCollect);
+        }
+
+        ApplicationParallelismType parallelism = appDepDescription.getParallelism();
+        if (parallelism != null) {
+            if (parallelism != ApplicationParallelismType.SERIAL) {
+                Map<ApplicationParallelismType, String> parallelismPrefix = processContext.getResourceJobManager().getParallelismPrefix();
+                if (parallelismPrefix != null){
+                    String parallelismCommand = parallelismPrefix.get(parallelism);
+                    if (parallelismCommand != null){
+                        groovyMap.add(Script.JOB_SUBMITTER_COMMAND, parallelismCommand);
+                    }else {
+                        throw new WorkerException("Parallelism prefix is not defined for given parallelism type " + parallelism + ".. Please define the parallelism prefix at App Catalog");
+                    }
+                }
+            }
+        }
+        return groovyMap;
+    }
+
+    public static ResourceJobManager getResourceJobManager(ProcessContext processContext) {
+        try {
+            JobSubmissionProtocol submissionProtocol = getPreferredJobSubmissionProtocol(processContext);
+            JobSubmissionInterface jobSubmissionInterface = getPreferredJobSubmissionInterface(processContext);
+            if (submissionProtocol == JobSubmissionProtocol.SSH ) {
+                SSHJobSubmission sshJobSubmission = JobSubmissionUtils.getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+                if (sshJobSubmission != null) {
+                    return sshJobSubmission.getResourceJobManager();
+                }
+            } else if (submissionProtocol == JobSubmissionProtocol.LOCAL) {
+                LOCALSubmission localJobSubmission = JobSubmissionUtils.getLocalJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+                if (localJobSubmission != null) {
+                    return localJobSubmission.getResourceJobManager();
+                }
+            } else if (submissionProtocol == JobSubmissionProtocol.SSH_FORK){
+                SSHJobSubmission sshJobSubmission = JobSubmissionUtils.getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+                if (sshJobSubmission != null) {
+                    return sshJobSubmission.getResourceJobManager();
+                }
+            }
+        } catch (AppCatalogException e) {
+            log.error("Error occured while retrieving resource job manager", e);
+        }
+        return null;
+    }
+
+    private static JobSubmissionProtocol getPreferredJobSubmissionProtocol(ProcessContext context) throws AppCatalogException {
+        try {
+            GwyResourceProfile gatewayProfile = context.getAppCatalog().getGatewayProfile();
+            String resourceHostId = context.getComputeResourceDescription().getComputeResourceId();
+            ComputeResourcePreference preference = gatewayProfile.getComputeResourcePreference(context.getGatewayId()
+                    , resourceHostId);
+            return preference.getPreferredJobSubmissionProtocol();
+        } catch (AppCatalogException e) {
+            log.error("Error occurred while initializing app catalog", e);
+            throw new AppCatalogException("Error occurred while initializing app catalog", e);
+        }
+    }
+
+    public static File createJobFile(GroovyMap groovyMap, TaskContext tc, JobManagerConfiguration jMC)
+            throws WorkerException {
+        try {
+            int number = new SecureRandom().nextInt();
+            number = (number < 0 ? -number : number);
+            File tempJobFile = new File(JobSubmissionUtils.getLocalDataDir(tc), "job_" + Integer.toString(number) + jMC.getScriptExtension());
+            FileUtils.writeStringToFile(tempJobFile, generateScript(groovyMap, jMC.getJobDescriptionTemplateName()));
+            return tempJobFile;
+        } catch (IOException e) {
+            throw new WorkerException("Error while writing script content to temp file");
+        }
+    }
+
+    private static String generateScript(GroovyMap groovyMap, String templateName) throws WorkerException {
+        URL templateUrl = ApplicationSettings.loadFile(templateName);
+        if (templateUrl == null) {
+            String error = "Template file '" + templateName + "' not found";
+            throw new WorkerException(error);
+        }
+        File template = new File(templateUrl.getPath());
+        TemplateEngine engine = new GStringTemplateEngine();
+        Writable make;
+        try {
+            make = engine.createTemplate(template).make(groovyMap);
+        } catch (Exception e) {
+            throw new WorkerException("Error while generating script using groovy map");
+        }
+        return make.toString();
+    }
+
+    private static boolean isEmailBasedJobMonitor(ProcessContext processContext) throws WorkerException, AppCatalogException {
+        JobSubmissionProtocol jobSubmissionProtocol = getPreferredJobSubmissionProtocol(processContext);
+        JobSubmissionInterface jobSubmissionInterface = getPreferredJobSubmissionInterface(processContext);
+        if (jobSubmissionProtocol == JobSubmissionProtocol.SSH) {
+            String jobSubmissionInterfaceId = jobSubmissionInterface.getJobSubmissionInterfaceId();
+            SSHJobSubmission sshJobSubmission = processContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
+            MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+            return monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR;
+        } else {
+            return false;
+        }
+    }
+
+    public static void saveJobModel(ProcessContext processContext, JobModel jobModel) throws WorkerException {
+        try {
+            ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+            experimentCatalog.add(ExpCatChildDataType.JOB, jobModel, processContext.getProcessId());
+        } catch (RegistryException e) {
+            String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId()
+                    + " jobId: " + jobModel.getJobId() + " : - Error while saving Job Model";
+            throw new WorkerException(msg, e);
+        }
+    }
+
+    private static JobSubmissionInterface getPreferredJobSubmissionInterface(ProcessContext processContext) throws AppCatalogException {
+        try {
+            String resourceHostId = processContext.getComputeResourceDescription().getComputeResourceId();
+            JobSubmissionProtocol preferredJobSubmissionProtocol = processContext.getPreferredJobSubmissionProtocol();
+            ComputeResourceDescription resourceDescription = processContext.getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
+            List<JobSubmissionInterface> jobSubmissionInterfaces = resourceDescription.getJobSubmissionInterfaces();
+            Map<JobSubmissionProtocol, List<JobSubmissionInterface>> orderedInterfaces = new HashMap<>();
+            List<JobSubmissionInterface> interfaces = new ArrayList<>();
+            if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()) {
+                for (JobSubmissionInterface submissionInterface : jobSubmissionInterfaces){
+
+                    if (preferredJobSubmissionProtocol != null){
+                        if (preferredJobSubmissionProtocol.toString().equals(submissionInterface.getJobSubmissionProtocol().toString())){
+                            if (orderedInterfaces.containsKey(submissionInterface.getJobSubmissionProtocol())){
+                                List<JobSubmissionInterface> interfaceList = orderedInterfaces.get(submissionInterface.getJobSubmissionProtocol());
+                                interfaceList.add(submissionInterface);
+                            }else {
+                                interfaces.add(submissionInterface);
+                                orderedInterfaces.put(submissionInterface.getJobSubmissionProtocol(), interfaces);
+                            }
+                        }
+                    }else {
+                        Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
+                            @Override
+                            public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
+                                return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
+                            }
+                        });
+                    }
+                }
+                interfaces = orderedInterfaces.get(preferredJobSubmissionProtocol);
+                Collections.sort(interfaces, new Comparator<JobSubmissionInterface>() {
+                    @Override
+                    public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
+                        return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
+                    }
+                });
+            } else {
+                throw new AppCatalogException("Compute resource should have at least one job submission interface defined...");
+            }
+            return interfaces.get(0);
+        } catch (AppCatalogException e) {
+            throw new AppCatalogException("Error occurred while retrieving data from app catalog", e);
+        }
+    }
+
+    private static LOCALSubmission getLocalJobSubmission(String submissionId) throws AppCatalogException {
+        try {
+            AppCatalog appCatalog = RegistryFactory.getAppCatalog();
+            return appCatalog.getComputeResource().getLocalJobSubmission(submissionId);
+        } catch (Exception e) {
+            String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId;
+            log.error(errorMsg, e);
+            throw new AppCatalogException(errorMsg, e);
+        }
+    }
+
+    public static UnicoreJobSubmission getUnicoreJobSubmission(String submissionId) throws AppCatalogException {
+        try {
+            AppCatalog appCatalog = RegistryFactory.getAppCatalog();
+            return appCatalog.getComputeResource().getUNICOREJobSubmission(submissionId);
+        } catch (Exception e) {
+            String errorMsg = "Error while retrieving UNICORE job submission with submission id : " + submissionId;
+            log.error(errorMsg, e);
+            throw new AppCatalogException(errorMsg, e);
+        }
+    }
+
+    public static SSHJobSubmission getSSHJobSubmission(String submissionId) throws AppCatalogException {
+        try {
+            AppCatalog appCatalog = RegistryFactory.getAppCatalog();
+            return appCatalog.getComputeResource().getSSHJobSubmission(submissionId);
+        } catch (Exception e) {
+            String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId;
+            log.error(errorMsg, e);
+            throw new AppCatalogException(errorMsg, e);
+        }
+    }
+
+    public static CloudJobSubmission getCloudJobSubmission(String submissionId) throws RegistryException {
+        try {
+            AppCatalog appCatalog = RegistryFactory.getAppCatalog();
+            return appCatalog.getComputeResource().getCloudJobSubmission(submissionId);
+        } catch (Exception e) {
+            String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId;
+            log.error(errorMsg, e);
+            throw new RegistryException(errorMsg, e);
+        }
+    }
+
+    private static String listToCsv(List<String> listOfStrings, char separator) {
+        StringBuilder sb = new StringBuilder();
+
+        // all but last
+        for (int i = 0; i < listOfStrings.size() - 1; i++) {
+            sb.append(listOfStrings.get(i));
+            sb.append(separator);
+        }
+
+        // last string, no separator
+        if (listOfStrings.size() > 0) {
+            sb.append(listOfStrings.get(listOfStrings.size() - 1));
+        }
+
+        return sb.toString();
+    }
+
+    public static byte[] longToBytes(long x) {
+        ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.putLong(x);
+        return buffer.array();
+    }
+
+    public static long bytesToLong(byte[] bytes) {
+        ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.put(bytes);
+        buffer.flip();//need flip
+        return buffer.getLong();
+    }
+
+    private static boolean isValid(String str) {
+        return str != null && !str.isEmpty();
+    }
+
+    private static void setMailAddresses(ProcessContext processContext, GroovyMap groovyMap)
+            throws WorkerException, AppCatalogException, ApplicationSettingsException {
+
+        ProcessModel processModel =  processContext.getProcessModel();
+        String emailIds = null;
+        if (isEmailBasedJobMonitor(processContext)) {
+            emailIds = ServerSettings.getEmailBasedMonitorAddress();
+        }
+        if (ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")) {
+            String userJobNotifEmailIds = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS);
+            if (userJobNotifEmailIds != null && !userJobNotifEmailIds.isEmpty()) {
+                if (emailIds != null && !emailIds.isEmpty()) {
+                    emailIds += ("," + userJobNotifEmailIds);
+                } else {
+                    emailIds = userJobNotifEmailIds;
+                }
+            }
+            if (processModel.isEnableEmailNotification()) {
+                List<String> emailList = processModel.getEmailAddresses();
+                String elist = JobSubmissionUtils.listToCsv(emailList, ',');
+                if (elist != null && !elist.isEmpty()) {
+                    if (emailIds != null && !emailIds.isEmpty()) {
+                        emailIds = emailIds + "," + elist;
+                    } else {
+                        emailIds = elist;
+                    }
+                }
+            }
+        }
+        if (emailIds != null && !emailIds.isEmpty()) {
+            log.info("Email list: " + emailIds);
+            groovyMap.add(Script.MAIL_ADDRESS, emailIds);
+        }
+    }
+
+    private static List<String> getProcessOutputValues(List<OutputDataObjectType> processOutputs) {
+        List<String> inputValues = new ArrayList<>();
+        if (processOutputs != null) {
+            for (OutputDataObjectType output : processOutputs) {
+                if (output.getApplicationArgument() != null
+                        && !output.getApplicationArgument().equals("")) {
+                    inputValues.add(output.getApplicationArgument());
+                }
+                if (output.getValue() != null && !output.getValue().equals("") && output.isRequiredToAddedToCommandLine()) {
+                    if (output.getType() == DataType.URI) {
+                        String filePath = output.getValue();
+                        filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+                        inputValues.add(filePath);
+                    }
+                }
+            }
+        }
+        return inputValues;
+    }
+
+    private static List<String> getProcessInputValues(List<InputDataObjectType> processInputs) {
+        List<String> inputValues = new ArrayList<String>();
+        if (processInputs != null) {
+
+            // sort the inputs first and then build the command ListR
+            Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+                @Override
+                public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+                    return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+                }
+            };
+            Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+            sortedInputSet.addAll(processInputs);
+
+            for (InputDataObjectType inputDataObjectType : sortedInputSet) {
+                if (!inputDataObjectType.isRequiredToAddedToCommandLine()) {
+                    continue;
+                }
+                if (inputDataObjectType.getApplicationArgument() != null
+                        && !inputDataObjectType.getApplicationArgument().equals("")) {
+                    inputValues.add(inputDataObjectType.getApplicationArgument());
+                }
+
+                if (inputDataObjectType.getValue() != null
+                        && !inputDataObjectType.getValue().equals("")) {
+                    if (inputDataObjectType.getType() == DataType.URI) {
+                        // set only the relative path
+                        String filePath = inputDataObjectType.getValue();
+                        filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+                        inputValues.add(filePath);
+                    } else if (inputDataObjectType.getType() == DataType.URI_COLLECTION) {
+                        String filePaths = inputDataObjectType.getValue();
+                        String[] paths = filePaths.split(WorkerConstants.MULTIPLE_INPUTS_SPLITTER);
+                        String filePath;
+                        String inputs = "";
+                        int i = 0;
+                        for (; i < paths.length - 1; i++) {
+                            filePath = paths[i];
+                            filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+                            // File names separate by a space
+                            inputs += filePath + " ";
+                        }
+                        inputs += paths[i];
+                        inputValues.add(inputs);
+                    } else {
+                        inputValues.add(inputDataObjectType.getValue());
+                    }
+
+                }
+            }
+        }
+        return inputValues;
+    }
+
+    private static String getQoS(String qualityOfService, String preferredBatchQueue) {
+        if(preferredBatchQueue == null  || preferredBatchQueue.isEmpty()
+                ||  qualityOfService == null  || qualityOfService.isEmpty()) return null;
+        final String qos = "qos";
+        Pattern pattern = Pattern.compile(preferredBatchQueue + "=(?<" + qos + ">[^,]*)");
+        Matcher matcher = pattern.matcher(qualityOfService);
+        if (matcher.find()) {
+            return matcher.group(qos);
+        }
+        return null;
+    }
+
+    private static int generateJobName() {
+        Random random = new Random();
+        int i = random.nextInt(Integer.MAX_VALUE);
+        i = i + 99999999;
+        if (i < 0) {
+            i = i * (-1);
+        }
+        return i;
+    }
+
+    private static String parseCommands(String value, GroovyMap bindMap) {
+        TemplateEngine templateEngine = new GStringTemplateEngine();
+        try {
+            return templateEngine.createTemplate(value).make(bindMap).toString();
+        } catch (ClassNotFoundException | IOException e) {
+            throw new IllegalArgumentException("Error while parsing command " + value
+                    + " , Invalid command or incomplete bind map");
+        }
+    }
+
+    private static String maxWallTimeCalculator(int maxWalltime) {
+        if (maxWalltime < 60) {
+            return "00:" + maxWalltime + ":00";
+        } else {
+            int minutes = maxWalltime % 60;
+            int hours = maxWalltime / 60;
+            return hours + ":" + minutes + ":00";
+        }
+    }
+
+    private static File getLocalDataDir(TaskContext taskContext) {
+        String outputPath = ServerSettings.getLocalDataLocation();
+        outputPath = (outputPath.endsWith(File.separator) ? outputPath : outputPath + File.separator);
+        return new File(outputPath + taskContext.getParentProcessContext() .getProcessId());
+    }
+
+    public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws WorkerException {
+        if(resourceJobManager == null)
+            return null;
+
+        ResourceConfig resourceConfig = JobSubmissionUtils.getResourceConfig(resourceJobManager.getResourceJobManagerType());
+        OutputParser outputParser;
+        try {
+            Class<? extends OutputParser> aClass = Class.forName(resourceConfig.getCommandOutputParser()).asSubclass
+                    (OutputParser.class);
+            outputParser = aClass.getConstructor().newInstance();
+        } catch (Exception e) {
+            throw new WorkerException("Error while instantiating output parser for " + resourceJobManager
+                    .getResourceJobManagerType().name());
+        }
+
+        String templateFileName = WorkerUtils.getTemplateFileName(resourceJobManager.getResourceJobManagerType());
+        switch (resourceJobManager.getResourceJobManagerType()) {
+            case PBS:
+                return new PBSJobConfiguration(templateFileName, ".pbs", resourceJobManager.getJobManagerBinPath(),
+                        resourceJobManager.getJobManagerCommands(), outputParser);
+            case SLURM:
+                return new SlurmJobConfiguration(templateFileName, ".slurm", resourceJobManager
+                        .getJobManagerBinPath(), resourceJobManager.getJobManagerCommands(), outputParser);
+            case LSF:
+                return new LSFJobConfiguration(templateFileName, ".lsf", resourceJobManager.getJobManagerBinPath(),
+                        resourceJobManager.getJobManagerCommands(), outputParser);
+            case UGE:
+                return new UGEJobConfiguration(templateFileName, ".pbs", resourceJobManager.getJobManagerBinPath(),
+                        resourceJobManager.getJobManagerCommands(), outputParser);
+            case FORK:
+                return new ForkJobConfiguration(templateFileName, ".sh", resourceJobManager.getJobManagerBinPath(),
+                        resourceJobManager.getJobManagerCommands(), outputParser);
+            // We don't have a job configuration manager for CLOUD type
+            default:
+                return null;
+        }
+    }
+
+    private static ResourceConfig getResourceConfig(ResourceJobManagerType resourceJobManagerType) {
+        return resources.get(resourceJobManagerType);
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/30e1f0ee/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/Script.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/Script.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/Script.java
new file mode 100644
index 0000000..776167f
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/Script.java
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.task.jobsubmission.utils;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+public enum Script {
+
+    SHELL_NAME("shellName"),
+    QUEUE_NAME("queueName"),
+    NODES("nodes"),
+    CPU_COUNT("cpuCount"),
+    MAIL_ADDRESS("mailAddress"),
+    ACCOUNT_STRING("accountString"),
+    MAX_WALL_TIME("maxWallTime"),
+    JOB_NAME("jobName"),
+    STANDARD_OUT_FILE("standardOutFile"),
+    STANDARD_ERROR_FILE("standardErrorFile"),
+    QUALITY_OF_SERVICE("qualityOfService"),
+    RESERVATION("reservation"),
+    EXPORTS("exports"),
+    MODULE_COMMANDS("moduleCommands"),
+    SCRATCH_LOCATION("scratchLocation"),
+    WORKING_DIR("workingDirectory"),
+    PRE_JOB_COMMANDS("preJobCommands"),
+    JOB_SUBMITTER_COMMAND("jobSubmitterCommand"),
+    EXECUTABLE_PATH("executablePath"),
+    INPUTS("inputs"),
+    POST_JOB_COMMANDS("postJobCommands"),
+    USED_MEM("usedMem"),
+    PROCESS_PER_NODE("processPerNode"),
+    CHASSIS_NAME("chassisName"),
+    INPUT_DIR("inputDir"),
+    OUTPUT_DIR("outputDir"),
+    USER_NAME("userName"),
+    GATEWAY_ID("gatewayId"),
+    GATEWAY_USER_NAME("gatewayUserName"),
+    APPLICATION_NAME("applicationName"),
+    ;
+
+    String name;
+    Script(String name) {
+        this.name = name;
+    }
+}


Mime
View raw message