airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject [1/2] git commit: committing more changes with orchestrator-registry integration - AIRAVATA-1028
Date Mon, 03 Mar 2014 17:08:56 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 0a470ce90 -> 22cd1a091


committing more changes with orchestrator-registry integration - AIRAVATA-1028


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/3f8457a7
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/3f8457a7
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/3f8457a7

Branch: refs/heads/master
Commit: 3f8457a73cac42f923bdfc2bedf7925b4b09cdfd
Parents: a4c0adc
Author: lahiru <lahiru@apache.org>
Authored: Mon Mar 3 12:05:35 2014 -0500
Committer: lahiru <lahiru@apache.org>
Committed: Mon Mar 3 12:05:35 2014 -0500

----------------------------------------------------------------------
 .../job/monitor/AiravataJobStatusUpdator.java   |  4 +-
 .../apache/airavata/job/monitor/MonitorID.java  | 10 +++
 .../monitor/impl/pull/qstat/QstatMonitor.java   | 75 ++++++++++++-------
 .../main/resources/schemas/HostDescription.xsd  |  1 +
 .../main/resources/airavata-server.properties   |  2 +-
 .../src/main/resources/monitor.properties       |  2 +-
 .../client/OrchestratorClientFactoryTest.java   | 79 ++++++++++++--------
 7 files changed, 110 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/3f8457a7/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
index f7afc4d..0a0fde5 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
@@ -66,11 +66,11 @@ public class AiravataJobStatusUpdator{
         JobState state = jobStatus.getState();
         switch (state) {
             case COMPLETE:
-                logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + "is DONE");
+                logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is DONE");
                 jobsToMonitor.remove(jobStatus.getMonitorID());
                 break;
             case UNKNOWN:
-                logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + "is UNKNOWN");
+                logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is UNKNOWN");
                 logger.info("Unknown job status came, if the old job status is RUNNING or
something active, we have to make it complete");
                 //todo implement this logic
                 break;

http://git-wip-us.apache.org/repos/asf/airavata/blob/3f8457a7/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
index 06d04ac..f33d348 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
@@ -58,6 +58,8 @@ public class MonitorID {
 
     private String taskID;
 
+    private int failedCount = 0;
+
 
     public MonitorID(HostDescription host, String jobID,String taskID,String experimentID,
String userName) {
         this.host = host;
@@ -170,4 +172,12 @@ public class MonitorID {
     public void setTaskID(String taskID) {
         this.taskID = taskID;
     }
+
+    public int getFailedCount() {
+        return failedCount;
+    }
+
+    public void setFailedCount(int failedCount) {
+        this.failedCount = failedCount;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/3f8457a7/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
index eba99f9..8f0b79d 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
@@ -20,6 +20,7 @@
 */
 package org.apache.airavata.job.monitor.impl.pull.qstat;
 
+import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.core.PullMonitor;
@@ -27,6 +28,8 @@ import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.job.monitor.state.JobStatus;
 import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.apache.airavata.schemas.gfac.HostDescriptionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,10 +74,11 @@ public class QstatMonitor extends PullMonitor implements Runnable {
                 startPulling();
                 // After finishing one iteration of the full queue this thread sleeps 1 second
                 Thread.sleep(1000);
-            } catch (AiravataMonitorException e) {
-                e.printStackTrace();  //To change body of catch statement use File | Settings
| File Templates.
-            } catch (InterruptedException e) {
-                e.printStackTrace();  //To change body of catch statement use File | Settings
| File Templates.
+            } catch (Exception e){
+                // we catch all the exceptions here because no matter what happens we do
not stop running this
+                // thread, but ideally we should report proper error messages, but this is
handled in startPulling
+                // method, incase something happen in Thread.sleep we handle it with this
catch block.
+                logger.error(e.getMessage());
             }
         }
     }
@@ -106,24 +110,26 @@ public class QstatMonitor extends PullMonitor implements Runnable {
                         this.queue.put(take);
                     }
                 }
-                if(take.getLastMonitored() == null || ((monitorDiff/1000) >= 5)){
-                        String hostName = take.getHost().getType().getHostAddress();
-                        ResourceConnection connection = null;
-                        if (connections.containsKey(hostName)) {
-                            logger.debug("We already have this connection so not going to
create one");
-                            connection = connections.get(hostName);
-                        } else {
-                            connection = new ResourceConnection(take, "/opt/torque/bin");
-                        }
-                        jobStatus.setMonitorID(take);
-                        jobStatus.setState(connection.getJobStatus(take));
-                        publisher.publish(jobStatus);
-                        // if the job is completed we do not have to put the job to the queue
again
-                        if (!jobStatus.getState().equals(JobState.COMPLETE)) {
-                            take.setLastMonitored(new Timestamp((new Date()).getTime()));
-                            this.queue.put(take);
-                        }
+                if (take.getLastMonitored() == null || ((monitorDiff / 1000) >= 5)) {
+                    GsisshHostType gsisshHostType = (GsisshHostType) take.getHost().getType();
+                    String hostName = gsisshHostType.getHostAddress();
+                    ResourceConnection connection = null;
+                    if (connections.containsKey(hostName)) {
+                        logger.debug("We already have this connection so not going to create
one");
+                        connection = connections.get(hostName);
+                    } else {
+                        connection = new ResourceConnection(take, gsisshHostType.getInstalledPath());
+                        connections.put(hostName, connection);
                     }
+                    jobStatus.setMonitorID(take);
+                    jobStatus.setState(connection.getJobStatus(take));
+                    publisher.publish(jobStatus);
+                    // if the job is completed we do not have to put the job to the queue
again
+                    if (!jobStatus.getState().equals(JobState.COMPLETE)) {
+                        take.setLastMonitored(new Timestamp((new Date()).getTime()));
+                        this.queue.put(take);
+                    }
+                }
             } catch (InterruptedException e) {
                 if(!this.queue.contains(take)){
                     try {
@@ -141,16 +147,31 @@ public class QstatMonitor extends PullMonitor implements Runnable {
                     publisher.publish(jobStatus);
                 }else if(e.getMessage().contains("illegally formed job identifier")){
                    logger.error("Wrong job ID is given so dropping the job from monitoring
system");
-                }
-                else if(!this.queue.contains(take)){   // we put the job back to the queue
only if its state is not unknown
-                    try {
-                        this.queue.put(take);
-                    } catch (InterruptedException e1) {
-                        e1.printStackTrace();  //To change body of catch statement use File
| Settings | File Templates.
+                } else if (!this.queue.contains(take)) {   // we put the job back to the
queue only if its state is not unknown
+                    if (take.getFailedCount() < 3) {
+                        try {
+                            take.setFailedCount(take.getFailedCount() + 1);
+                            this.queue.put(take);
+                        } catch (InterruptedException e1) {
+                            e1.printStackTrace();
+                        }
+                    } else {
+                        logger.error("Tried to monitor the job 3 times, so dropping of the
the Job with ID: " + take.getJobID());
                     }
                 }
                 logger.error("Error retrieving the job status");
                 throw new AiravataMonitorException("Error retrieving the job status", e);
+            } catch (Exception e){
+                if (take.getFailedCount() < 3) {
+                        try {
+                            take.setFailedCount(take.getFailedCount() + 1);
+                            this.queue.put(take);
+                        } catch (InterruptedException e1) {
+                            e1.printStackTrace();
+                        }
+                    } else {
+                        logger.error("Tryied to monitor the job 3 times, so dropping of the
the Job with ID: " + take.getJobID());
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/3f8457a7/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd
----------------------------------------------------------------------
diff --git a/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd b/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd
index 45ac43b..27dbed1 100644
--- a/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd
+++ b/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd
@@ -109,6 +109,7 @@
                     <element name="exports" type="gfac:exportProperties" minOccurs="0"
maxOccurs="1"/>
                     <element name="preJobCommands" type="xsd:string" minOccurs="0" maxOccurs="unbounded"/>
                     <element name="postJobCommands" type="xsd:string" minOccurs="0" maxOccurs="unbounded"/>
+                    <element name="installedPath" type="xsd:string" minOccurs="0" maxOccurs="1"
default="/opt/torque/bin"/>
                 </sequence>
             </extension>
         </complexContent>

http://git-wip-us.apache.org/repos/asf/airavata/blob/3f8457a7/modules/orchestrator/airavata-orchestrator-service/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/airavata-server.properties
b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/airavata-server.properties
index 1a3967f..f5ea35f 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/airavata-server.properties
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/airavata-server.properties
@@ -100,7 +100,7 @@ gfac.embedded=true
 
 myproxy.server=myproxy.teragrid.org
 myproxy.user=ogce
-myproxy.pass=
+myproxy.pass=0Gce3098
 myproxy.life=3600
 # XSEDE Trusted certificates can be downloaded from https://software.xsede.org/security/xsede-certs.tar.gz
 trusted.cert.location=/Users/lahirugunathilake/Downloads/certificates

http://git-wip-us.apache.org/repos/asf/airavata/blob/3f8457a7/modules/orchestrator/airavata-orchestrator-service/src/main/resources/monitor.properties
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/monitor.properties
b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/monitor.properties
index dc4ebdc..32b55a3 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/monitor.properties
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/monitor.properties
@@ -6,5 +6,5 @@ trusted.certificate.location=/Users/lahirugunathilake/Downloads/certificates
 certificate.path=/Users/lahirugunathilake/Downloads/certificates
 myproxy.server=myproxy.teragrid.org
 myproxy.user=ogce
-myproxy.password=
+myproxy.password=0Gce3098
 myproxy.life=3600
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/3f8457a7/modules/orchestrator/airavata-orchestrator-service/src/test/java/org/apache/airavata/orchestrator/client/OrchestratorClientFactoryTest.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/test/java/org/apache/airavata/orchestrator/client/OrchestratorClientFactoryTest.java
b/modules/orchestrator/airavata-orchestrator-service/src/test/java/org/apache/airavata/orchestrator/client/OrchestratorClientFactoryTest.java
index a20c80e..4a1bc11 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/test/java/org/apache/airavata/orchestrator/client/OrchestratorClientFactoryTest.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/test/java/org/apache/airavata/orchestrator/client/OrchestratorClientFactoryTest.java
@@ -39,6 +39,7 @@ import org.apache.airavata.registry.cpi.CompositeIdentifier;
 import org.apache.airavata.registry.cpi.ParentDataType;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.schemas.gfac.DataType;
+import org.apache.thrift.TException;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -49,9 +50,9 @@ public class OrchestratorClientFactoryTest {
     private DocumentCreator documentCreator;
     private OrchestratorService.Client orchestratorClient;
     private Registry registry;
-
+    private int NUM_CONCURRENT_REQUESTS = 1;
     @Before
-    public void setUp(){
+    public void setUp() {
         orchestratorClient = OrchestratorClientFactory.createOrchestratorClient("localhost",
8940);
         registry = RegistryFactory.getDefaultRegistry();
         AiravataUtils.setExecutionAsServer();
@@ -63,7 +64,6 @@ public class OrchestratorClientFactoryTest {
 
     private AiravataAPI getAiravataAPI() {
         AiravataAPI airavataAPI = null;
-        if (airavataAPI == null) {
             try {
                 String systemUserName = ServerSettings.getSystemUser();
                 String gateway = ServerSettings.getSystemUserGateway();
@@ -73,46 +73,61 @@ public class OrchestratorClientFactoryTest {
             } catch (AiravataAPIInvocationException e) {
                 e.printStackTrace();
             }
-        }
         return airavataAPI;
     }
 
-    private void storeDescriptors(){
+    private void storeDescriptors() {
 
     }
 
     @Test
-    public void storeExperimentDetail(){
-        try{
-            List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
-            DataObjectType input = new DataObjectType();
-            input.setKey("echo_input");
-            input.setType(DataType.STRING.toString());
-            input.setValue("echo_output=Hello World");
-            exInputs.add(input);
-
+    public void storeExperimentDetail() {
+            for (int i = 0; i < NUM_CONCURRENT_REQUESTS; i++) {
+                Thread thread = new Thread() {
+                    public void run() {
+                        List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
+                        DataObjectType input = new DataObjectType();
+                        input.setKey("echo_input");
+                        input.setType(DataType.STRING.toString());
+                        input.setValue("echo_output=Hello World");
+                        exInputs.add(input);
 
-            List<DataObjectType> exOut = new ArrayList<DataObjectType>();
-            DataObjectType output = new DataObjectType();
-            output.setKey("echo_output");
-            output.setType(DataType.STRING.toString());
-            output.setValue("");
-            exOut.add(output);
 
-            Experiment simpleExperiment = ExperimentModelUtil.createSimpleExperiment("project1",
"admin", "echoExperiment", "SimpleEcho2", "SimpleEcho2", exInputs);
-            simpleExperiment.setExperimentOutputs(exOut);
+                        List<DataObjectType> exOut = new ArrayList<DataObjectType>();
+                        DataObjectType output = new DataObjectType();
+                        output.setKey("echo_output");
+                        output.setType(DataType.STRING.toString());
+                        output.setValue("");
+                        exOut.add(output);
 
-            ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling("trestles.sdsc.edu",
1, 1, 1, "normal", 0, 0, 1, "sds128");
-            scheduling.setResourceHostId("gsissh-trestles");
-            UserConfigurationData userConfigurationData = new UserConfigurationData();
-            userConfigurationData.setComputationalResourceScheduling(scheduling);
-            simpleExperiment.setUserConfigurationData(userConfigurationData);
-            String expId = (String)registry.add(ParentDataType.EXPERIMENT, simpleExperiment);
+                        Experiment simpleExperiment = ExperimentModelUtil.createSimpleExperiment("project1",
"admin", "echoExperiment", "SimpleEcho2", "SimpleEcho2", exInputs);
+                        simpleExperiment.setExperimentOutputs(exOut);
 
-            orchestratorClient.launchExperiment(expId);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+                        ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling("trestles.sdsc.edu",
1, 1, 1, "normal", 0, 0, 1, "sds128");
+                        scheduling.setResourceHostId("gsissh-trestles");
+                        UserConfigurationData userConfigurationData = new UserConfigurationData();
+                        userConfigurationData.setComputationalResourceScheduling(scheduling);
+                        simpleExperiment.setUserConfigurationData(userConfigurationData);
+                        String expId = null;
+                        try {
+                            expId = (String) registry.add(ParentDataType.EXPERIMENT, simpleExperiment);
+                        } catch (Exception e) {
+                            e.printStackTrace();  //To change body of catch statement use
File | Settings | File Templates.
+                        }
 
+                        try {
+                            orchestratorClient.launchExperiment(expId);
+                        } catch (TException e) {
+                            e.printStackTrace();  //To change body of catch statement use
File | Settings | File Templates.
+                        }
+                    }
+                };
+                thread.start();
+                try {
+                    thread.join();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();  //To change body of catch statement use File |
Settings | File Templates.
+                }
+            }
     }
 }


Mime
View raw message