Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4AF0217E9F for ; Thu, 2 Oct 2014 19:53:12 +0000 (UTC) Received: (qmail 54036 invoked by uid 500); 2 Oct 2014 19:53:12 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 53997 invoked by uid 500); 2 Oct 2014 19:53:12 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 53988 invoked by uid 99); 2 Oct 2014 19:53:11 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Oct 2014 19:53:11 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B4996A1771A; Thu, 2 Oct 2014 19:53:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lahiru@apache.org To: commits@airavata.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: fixing more amqp related issues Date: Thu, 2 Oct 2014 19:53:11 +0000 (UTC) Repository: airavata Updated Branches: refs/heads/master c6825295d -> 9258b9019 fixing more amqp related issues Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/9258b901 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/9258b901 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/9258b901 Branch: refs/heads/master Commit: 9258b90199cd59a281f2eb05b89ea0c9e558537e Parents: c682529 Author: lahiru Authored: Thu Oct 2 15:53:07 2014 -0400 Committer: lahiru Committed: Thu Oct 2 15:53:07 2014 -0400 ---------------------------------------------------------------------- .../airavata/gfac/server/GfacServerHandler.java | 10 ++-- .../monitor/impl/pull/qstat/HPCPullMonitor.java | 52 ++++++-------------- 2 files changed, 22 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/9258b901/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 3d86dff..1acc9d6 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -196,9 +196,13 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ public boolean submitJob(String experimentId, String taskId, String gatewayId) throws TException { logger.info("GFac Recieved the Experiment: " + experimentId + " TaskId: " + taskId); GFac gfac = getGfac(); - InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, experimentId, taskId, gatewayId); - inHandlerFutures.add(GFacThreadPoolExecutor.getCachedThreadPool().submit(inputHandlerWorker)); - return true; +// InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, experimentId, taskId, gatewayId); + try { + return gfac.submitJob(experimentId, taskId, gatewayId); + } catch (GFacException e) { + throw new TException("Error launching the experiment : " + e.getMessage(), e); + } +// inHandlerFutures.add(GFacThreadPoolExecutor.getCachedThreadPool().submit(inputHandlerWorker)); } public boolean cancelJob(String experimentId, String taskId) throws TException { http://git-wip-us.apache.org/repos/asf/airavata/blob/9258b901/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index a98c9c7..c027608 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -62,7 +62,7 @@ import java.util.concurrent.LinkedBlockingQueue; */ public class HPCPullMonitor extends PullMonitor { private final static Logger logger = LoggerFactory.getLogger(HPCPullMonitor.class); - public static final int FAILED_COUNT = 3; + public static final int FAILED_COUNT = 1; // I think this should use DelayedBlocking Queue to do the monitoring*/ private BlockingQueue queue; @@ -194,7 +194,7 @@ public class HPCPullMonitor extends PullMonitor { logger.info("ExperimentID: " + cancelMId.split("\\+")[0] + ",TaskID: " + cancelMId.split("\\+")[1] + "JobID" + iMonitorID.getJobID()); iMonitorID.setStatus(JobState.CANCELED); completedJobs.put(iMonitorID.getJobName(), iMonitorID); - iterator1.remove(); + cancelJobList.remove(cancelMId); break; } } @@ -210,13 +210,10 @@ public class HPCPullMonitor extends PullMonitor { logger.info("This job is finished because push notification came with " + completeId); completedJobs.put(iMonitorID.getJobName(), iMonitorID); iMonitorID.setStatus(JobState.COMPLETE); + completedJobsFromPush.remove(completeId);//we have to make this empty everytime we iterate, otherwise this list will accumulate and will + // lead to a memory leak break; } - //we have to make this empty everytime we iterate, otherwise this list will accumulate and will - // lead to a memory leak - } - if(completeId!=null) { - completedJobsFromPush.remove(completeId); } iterator = completedJobsFromPush.listIterator(); } @@ -244,14 +241,13 @@ public class HPCPullMonitor extends PullMonitor { " 3 times, so skip this Job from Monitor"); iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); JobDescriptor jobDescriptor = JobDescriptor.fromXML(iMonitorID.getJobExecutionContext().getJobDetails().getJobDescription()); - List stdOut = connection.getCluster().listDirectory(jobDescriptor.getOutputDirectory()); - if (stdOut.size() > 0) { - if (stdOut.contains(jobDescriptor.getStandardErrorFile()) && stdOut.contains(jobDescriptor.getStandardOutFile())) { - completedJobs.put(iMonitorID.getJobName(), iMonitorID); - } else { - iMonitorID.setFailedCount(0); - } + List stdOut = connection.getCluster().listDirectory(jobDescriptor.getOutputDirectory()); // check the outputs directory + if (stdOut.size() > 0) { // have to be careful with this + completedJobs.put(iMonitorID.getJobName(), iMonitorID); + } else { + iMonitorID.setFailedCount(0); } + } else { // Evey iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); @@ -308,31 +304,13 @@ public class HPCPullMonitor extends PullMonitor { publisher.publish(jobStatus); } else if (e.getMessage().contains("illegally formed job identifier")) { logger.error("Wrong job ID is given so dropping the job from monitoring system"); - } else if (!this.queue.contains(take)) { // we put the job back to the queue only if its state is not unknown - if (currentMonitorID == null) { - logger.error("Monitoring the jobs failed, for user: " + take.getUserName() - + " in Host: " + currentHostDescription.getType().getHostAddress()); - } else { - if (currentMonitorID != null) { - if (currentMonitorID.getFailedCount() < 2) { - try { - currentMonitorID.setFailedCount(currentMonitorID.getFailedCount() + 1); - this.queue.put(take); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } - } else { - logger.error(e.getMessage()); - logger.error("Tried to monitor the job 3 times, so dropping of the the Job with ID: " + currentMonitorID.getJobID()); - } - } + } else if (!this.queue.contains(take)) { + try { + queue.put(take); + } catch (InterruptedException e1) { + e1.printStackTrace(); } } - try { - queue.put(take); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } throw new AiravataMonitorException("Error retrieving the job status", e); } catch (Exception e) { try {