airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ramin...@apache.org
Subject [1/2] airavata git commit: To fix AIRAVATA-1476 for unicore provider and added rabbitmq messaging.
Date Wed, 13 May 2015 02:42:58 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 624dd4149 -> ea93cc1ef


To fix AIRAVATA-1476 for unicore provider and added rabbitmq messaging. 

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d076fa8b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d076fa8b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d076fa8b

Branch: refs/heads/master
Commit: d076fa8bc928d8161a3483cc883108fd8e79f071
Parents: 1b84883
Author: raminder <raminder@apache.org>
Authored: Tue May 12 17:09:13 2015 -0400
Committer: raminder <raminder@apache.org>
Committed: Tue May 12 17:09:13 2015 -0400

----------------------------------------------------------------------
 .../main/resources/airavata-server.properties   |  4 ++
 .../gfac/bes/provider/impl/BESProvider.java     | 50 +++++++++++---------
 .../gfac/bes/utils/DataTransferrer.java         | 14 ++++--
 .../gfac/core/provider/AbstractProvider.java    |  3 --
 .../airavata/gfac/ssh/util/GFACSSHUtils.java    | 44 ++++++++++-------
 5 files changed, 68 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/d076fa8b/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index c5d6a3f..27e962e 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -150,6 +150,10 @@ gfac.passive=true
 ### Incase of password authentication. 
 #ssh.password=Password for ssh connection
 
+################ ---------- BES Properties ------------------- ###############
+#bes.ca.cert.path=<location>/certificates/cacert.pem
+#bes.ca.key.path=<location>/certificates/cakey.pem
+#bes.ca.key.pass=passphrase
 
 
 ###########################################################################

http://git-wip-us.apache.org/repos/asf/airavata/blob/d076fa8b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
index 2f6add6..73bf0fc 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@ -34,6 +34,7 @@ import org.apache.airavata.gfac.bes.utils.JSDLGenerator;
 import org.apache.airavata.gfac.bes.utils.SecurityUtils;
 import org.apache.airavata.gfac.bes.utils.StorageCreator;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
 import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
 import org.apache.airavata.gfac.core.notification.events.StatusChangeEvent;
 import org.apache.airavata.gfac.core.notification.events.UnicoreJobIDEvent;
@@ -44,6 +45,8 @@ import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
 import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
 import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.xmlbeans.XmlCursor;
@@ -147,7 +150,7 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
 
             log.info(String.format("Activity Submitting to %s ... \n",
                     factoryUrl));
-            jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+            monitorPublisher.publish(new StartExecutionEvent());
             CreateActivityResponseDocument response = factory.createActivity(cad);
             log.info(String.format("Activity Submitted to %s \n", factoryUrl));
 
@@ -162,22 +165,15 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
                         .toString();
             }
             log.info("JobID: " + jobId);
-            jobDetails.setJobID(activityEpr.toString());
+            jobDetails.setJobID(jobId);
             jobDetails.setJobDescription(activityEpr.toString());
 
             jobExecutionContext.setJobDetails(jobDetails);
+            GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
             log.info(formatStatusMessage(activityEpr.getAddress()
                     .getStringValue(), factory.getActivityStatus(activityEpr)
                     .toString()));
-
-            jobExecutionContext.getNotifier().publish(new UnicoreJobIDEvent(jobId));
-//            GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SUBMITTED);
-
             
-            log.info(formatStatusMessage(activityEpr.getAddress()
-                    .getStringValue(), factory.getActivityStatus(activityEpr)
-                    .toString()));
-
             waitUntilDone(factory, activityEpr, jobDetails);
 
             ActivityStatusType activityStatus = null;
@@ -196,10 +192,7 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
                 log.info(error);
   
                 JobState applicationJobStatus = JobState.FAILED;
-                String jobStatusMessage = "Status of job " + jobId + "is "
-                        + applicationJobStatus;
-                jobExecutionContext.getNotifier().publish(
-                        new StatusChangeEvent(jobStatusMessage));
+                sendNotification(jobExecutionContext,applicationJobStatus);
                 GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, applicationJobStatus);
                 try {Thread.sleep(5000);} catch (InterruptedException e) {}
                 
@@ -209,16 +202,16 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
                 
             } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) {
                 JobState applicationJobStatus = JobState.CANCELED;
-                String jobStatusMessage = "Status of job " + jobId + "is "
-                        + applicationJobStatus;
-                jobExecutionContext.getNotifier().publish(
-                        new StatusChangeEvent(jobStatusMessage));
+                sendNotification(jobExecutionContext,applicationJobStatus);
                 GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, applicationJobStatus);
                 throw new GFacProviderException(
                         jobExecutionContext.getExperimentID() + "Job Canceled");
             } else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) {
                 try {
                     Thread.sleep(5000);
+                    JobState applicationJobStatus = JobState.COMPLETE;
+                    sendNotification(jobExecutionContext,applicationJobStatus);
+                    
                 } catch (InterruptedException e) {
                 }
                 if (activityStatus.getExitCode() == 0) {
@@ -432,10 +425,8 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
 	
 	            ActivityStatusType activityStatus = getStatus(factory, activityEpr);
 	            JobState applicationJobStatus = getApplicationJobStatus(activityStatus);
-	            String jobStatusMessage = "Status of job " + jobId + "is " + applicationJobStatus;
-//	            GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, applicationJobStatus);
-	
-	            jobExecutionContext.getNotifier().publish(new StatusChangeEvent(jobStatusMessage));
+	         
+	            sendNotification(jobExecutionContext,applicationJobStatus);
 	
 	            // GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId,
 	            // applicationJobStatus);
@@ -444,9 +435,24 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
 	            } catch (InterruptedException e) {}
 	            continue;
 	        }
+			return;
 		} catch(Exception e) {
 			log.error("Error monitoring job status..");
 			throw e;
 		}
 	}
+    private void sendNotification(JobExecutionContext jobExecutionContext,  JobState status)
{
+        JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent();
+        JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(),
+        		jobExecutionContext.getTaskData().getTaskID(),
+        		jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+        		jobExecutionContext.getExperimentID(),
+        		jobExecutionContext.getGatewayID());
+        jobStatus.setJobIdentity(jobIdentity);
+        jobStatus.setState(status);
+        log.debug(jobStatus.getJobIdentity().getJobId(), "Published job status change request,
" +
+                "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
+        jobStatus.getJobIdentity().getTaskId());
+        monitorPublisher.publish(jobStatus);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/d076fa8b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
index d7f3244..453e45a 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
@@ -209,10 +209,14 @@ public class DataTransferrer {
 	
 	private String getDownloadLocation() {
 		TaskDetails taskData = jobContext.getTaskData();
-		if (taskData != null && taskData.getAdvancedOutputDataHandling() != null) {
-			String outputDataDirectory = taskData.getAdvancedOutputDataHandling().getOutputDataDir();
-			return outputDataDirectory;
-		}
-		return null;
+		//In case of third party transfer this will not work.
+//		if (taskData != null && taskData.getAdvancedOutputDataHandling() != null) {
+//			String outputDataDirectory = taskData.getAdvancedOutputDataHandling().getOutputDataDir();
+//			return outputDataDirectory;
+//		}
+		String outputDataDir = File.separator + "tmp";
+        outputDataDir = outputDataDir + File.separator + jobContext.getExperimentID();
+        (new File(outputDataDir)).mkdirs();
+		return outputDataDir;
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/d076fa8b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
index 3282b5a..b650482 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
@@ -45,9 +45,6 @@ public abstract class AbstractProvider implements GFacProvider{
 
     protected AbstractProvider() {                                            //todo this
has to be fixed
         this.monitorPublisher = BetterGfacImpl.getMonitorPublisher();
-        if(this.monitorPublisher == null){
-            this.monitorPublisher = BetterGfacImpl.getMonitorPublisher();
-        }
     }
 
     public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,
GFacException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/d076fa8b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
index bb580fb..eb0b811 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
@@ -207,7 +207,12 @@ public class GFACSSHUtils {
             SSHSecurityContext sshSecurityContext = new SSHSecurityContext();
             AppCatalog appCatalog = jobExecutionContext.getAppCatalog();
             JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
-            SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(preferredJobSubmissionInterface.getJobSubmissionInterfaceId());
+            SSHJobSubmission sshJobSubmission = null;
+			try {
+				sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(preferredJobSubmissionInterface.getJobSubmissionInterfaceId());
+			} catch (Exception e1) {
+				 logger.error("Not able to get SSHJobSubmission from registry");
+			}
 
             Cluster pbsCluster = null;
             String key=sshAuth.getKey();
@@ -239,25 +244,30 @@ public class GFACSSHUtils {
                 }
                 if (recreate) {
                	 JobManagerConfiguration jConfig = null;
-                 String installedParentPath = jobExecutionContext.getResourceJobManager().getJobManagerBinPath();
+               	 String installedParentPath = null;
+               	 if(jobExecutionContext.getResourceJobManager()!= null){
+               		installedParentPath = jobExecutionContext.getResourceJobManager().getJobManagerBinPath();
+               	 }
                  if (installedParentPath == null) {
                      installedParentPath = "/";
                  }
-                 String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString();
-                 if (jobManager == null) {
-                     logger.error("No Job Manager is configured, so we are picking pbs as
the default job manager");
-                     jConfig = CommonUtils.getPBSJobManager(installedParentPath);
-                 } else {
-                     if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
-                         jConfig = CommonUtils.getPBSJobManager(installedParentPath);
-                     } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
-                         jConfig = CommonUtils.getSLURMJobManager(installedParentPath);
-                     } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager))
{
-                         jConfig = CommonUtils.getSGEJobManager(installedParentPath);
-                     } else if(LSF_JOB_MANAGER.equals(jobManager)) {
-                         jConfig = CommonUtils.getLSFJobManager(installedParentPath);
-                     }
-                 }
+					if (sshJobSubmission != null) {
+						String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString();
+						if (jobManager == null) {
+							logger.error("No Job Manager is configured, so we are picking pbs as the default job
manager");
+							jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+						} else {
+							if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+								jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+							} else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+								jConfig = CommonUtils.getSLURMJobManager(installedParentPath);
+							} else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+								jConfig = CommonUtils.getSGEJobManager(installedParentPath);
+							} else if (LSF_JOB_MANAGER.equals(jobManager)) {
+								jConfig = CommonUtils.getLSFJobManager(installedParentPath);
+							}
+						}
+					}
                     pbsCluster = new PBSCluster(sshAuth.getServerInfo(), sshAuth.getAuthenticationInfo(),jConfig);
                     key = sshAuth.getKey();
                     List<Cluster> pbsClusters = null;


Mime
View raw message