airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject git commit: fixing more amqp related issues
Date Thu, 02 Oct 2014 19:53:11 GMT
Repository: airavata
Updated Branches:
  refs/heads/master c6825295d -> 9258b9019


fixing more amqp related issues


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/9258b901
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/9258b901
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/9258b901

Branch: refs/heads/master
Commit: 9258b90199cd59a281f2eb05b89ea0c9e558537e
Parents: c682529
Author: lahiru <lahiru@apache.org>
Authored: Thu Oct 2 15:53:07 2014 -0400
Committer: lahiru <lahiru@apache.org>
Committed: Thu Oct 2 15:53:07 2014 -0400

----------------------------------------------------------------------
 .../airavata/gfac/server/GfacServerHandler.java | 10 ++--
 .../monitor/impl/pull/qstat/HPCPullMonitor.java | 52 ++++++--------------
 2 files changed, 22 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/9258b901/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 3d86dff..1acc9d6 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -196,9 +196,13 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
     public boolean submitJob(String experimentId, String taskId, String gatewayId) throws
TException {
         logger.info("GFac Recieved the Experiment: " + experimentId + " TaskId: " + taskId);
         GFac gfac = getGfac();
-        InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, experimentId,
taskId, gatewayId);
-        inHandlerFutures.add(GFacThreadPoolExecutor.getCachedThreadPool().submit(inputHandlerWorker));
-        return true;
+//        InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, experimentId,
taskId, gatewayId);
+        try {
+            return gfac.submitJob(experimentId, taskId, gatewayId);
+        } catch (GFacException e) {
+            throw new TException("Error launching the experiment : " + e.getMessage(), e);
+        }
+//        inHandlerFutures.add(GFacThreadPoolExecutor.getCachedThreadPool().submit(inputHandlerWorker));
     }
 
     public boolean cancelJob(String experimentId, String taskId) throws TException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/9258b901/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index a98c9c7..c027608 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -62,7 +62,7 @@ import java.util.concurrent.LinkedBlockingQueue;
  */
 public class HPCPullMonitor extends PullMonitor {
     private final static Logger logger = LoggerFactory.getLogger(HPCPullMonitor.class);
-    public static final int FAILED_COUNT = 3;
+    public static final int FAILED_COUNT = 1;
 
     // I think this should use DelayedBlocking Queue to do the monitoring*/
     private BlockingQueue<UserMonitorData> queue;
@@ -194,7 +194,7 @@ public class HPCPullMonitor extends PullMonitor {
                                 logger.info("ExperimentID: " + cancelMId.split("\\+")[0]
+ ",TaskID: " + cancelMId.split("\\+")[1] + "JobID" + iMonitorID.getJobID());
                                 iMonitorID.setStatus(JobState.CANCELED);
                                 completedJobs.put(iMonitorID.getJobName(), iMonitorID);
-                                iterator1.remove();
+                                cancelJobList.remove(cancelMId);
                                 break;
                             }
                         }
@@ -210,13 +210,10 @@ public class HPCPullMonitor extends PullMonitor {
                                     logger.info("This job is finished because push notification
came with <username,jobName> " + completeId);
                                     completedJobs.put(iMonitorID.getJobName(), iMonitorID);
                                     iMonitorID.setStatus(JobState.COMPLETE);
+                                    completedJobsFromPush.remove(completeId);//we have to
make this empty everytime we iterate, otherwise this list will accumulate and will
+                                    // lead to a memory leak
                                     break;
                                 }
-                                //we have to make this empty everytime we iterate, otherwise
this list will accumulate and will
-                                // lead to a memory leak
-                            }
-                            if(completeId!=null) {
-                                completedJobsFromPush.remove(completeId);
                             }
                             iterator = completedJobsFromPush.listIterator();
                         }
@@ -244,14 +241,13 @@ public class HPCPullMonitor extends PullMonitor {
                                     " 3 times, so skip this Job from Monitor");
                             iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
                             JobDescriptor jobDescriptor = JobDescriptor.fromXML(iMonitorID.getJobExecutionContext().getJobDetails().getJobDescription());
-                            List<String> stdOut = connection.getCluster().listDirectory(jobDescriptor.getOutputDirectory());
-                            if (stdOut.size() > 0) {
-                                if (stdOut.contains(jobDescriptor.getStandardErrorFile())
&& stdOut.contains(jobDescriptor.getStandardOutFile())) {
-                                    completedJobs.put(iMonitorID.getJobName(), iMonitorID);
-                                } else {
-                                    iMonitorID.setFailedCount(0);
-                                }
+                            List<String> stdOut = connection.getCluster().listDirectory(jobDescriptor.getOutputDirectory());
// check the outputs directory
+                            if (stdOut.size() > 0) { // have to be careful with this
+                                completedJobs.put(iMonitorID.getJobName(), iMonitorID);
+                            } else {
+                                iMonitorID.setFailedCount(0);
                             }
+
                         } else {
                             // Evey
                             iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
@@ -308,31 +304,13 @@ public class HPCPullMonitor extends PullMonitor {
                 publisher.publish(jobStatus);
             } else if (e.getMessage().contains("illegally formed job identifier")) {
                 logger.error("Wrong job ID is given so dropping the job from monitoring system");
-            } else if (!this.queue.contains(take)) {   // we put the job back to the queue
only if its state is not unknown
-                if (currentMonitorID == null) {
-                    logger.error("Monitoring the jobs failed, for user: " + take.getUserName()
-                            + " in Host: " + currentHostDescription.getType().getHostAddress());
-                } else {
-                    if (currentMonitorID != null) {
-                        if (currentMonitorID.getFailedCount() < 2) {
-                            try {
-                                currentMonitorID.setFailedCount(currentMonitorID.getFailedCount()
+ 1);
-                                this.queue.put(take);
-                            } catch (InterruptedException e1) {
-                                e1.printStackTrace();
-                            }
-                        } else {
-                            logger.error(e.getMessage());
-                            logger.error("Tried to monitor the job 3 times, so dropping of
the the Job with ID: " + currentMonitorID.getJobID());
-                        }
-                    }
+            } else if (!this.queue.contains(take)) {
+                try {
+                    queue.put(take);
+                } catch (InterruptedException e1) {
+                    e1.printStackTrace();
                 }
             }
-            try {
-                queue.put(take);
-            } catch (InterruptedException e1) {
-                e1.printStackTrace();
-            }
             throw new AiravataMonitorException("Error retrieving the job status", e);
         } catch (Exception e) {
             try {


Mime
View raw message