Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 19BAB17CB2 for ; Tue, 11 Nov 2014 19:10:49 +0000 (UTC) Received: (qmail 52120 invoked by uid 500); 11 Nov 2014 19:10:49 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 52007 invoked by uid 500); 11 Nov 2014 19:10:48 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 51795 invoked by uid 99); 11 Nov 2014 19:10:48 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Nov 2014 19:10:48 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 858B6A0D94C; Tue, 11 Nov 2014 19:10:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chathuri@apache.org To: commits@airavata.apache.org Date: Tue, 11 Nov 2014 19:10:59 -0000 Message-Id: <47d06dd618e8481da0bddec9d53109f3@git.apache.org> In-Reply-To: <89e993ca943b405a81a6a4fe3b5c786e@git.apache.org> References: <89e993ca943b405a81a6a4fe3b5c786e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/16] airavata git commit: fixig concurrent modification exctpio in monitoring fixig concurrent modification exctpio in monitoring Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/af73c444 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/af73c444 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/af73c444 Branch: refs/heads/gfac_appcatalog_int Commit: af73c44468149b23053da8dc3247e78894fd751d Parents: 97b384b Author: Ginnaliya Gamathige Authored: Tue Nov 11 10:50:02 2014 -0500 Committer: Ginnaliya Gamathige Committed: Tue Nov 11 10:50:02 2014 -0500 ---------------------------------------------------------------------- .../monitor/impl/pull/qstat/HPCPullMonitor.java | 66 +++++++++++--------- .../airavata/gfac/monitor/util/CommonUtils.java | 6 +- 2 files changed, 36 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/af73c444/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index 8e5f758..66cc5f7 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -158,21 +158,22 @@ public class HPCPullMonitor extends PullMonitor { try { take = this.queue.take(); List hostMonitorData = take.getHostMonitorData(); - for (HostMonitorData iHostMonitorData : hostMonitorData) { + for (ListIterator hostIterator = hostMonitorData.listIterator(); hostIterator.hasNext();) { + HostMonitorData iHostMonitorData = hostIterator.next(); if (iHostMonitorData.getHost().getType() instanceof GsisshHostType || iHostMonitorData.getHost().getType() instanceof SSHHostType) { - String hostName = iHostMonitorData.getHost().getType().getHostAddress(); + String hostName = iHostMonitorData.getHost().getType().getHostAddress(); ResourceConnection connection = null; if (connections.containsKey(hostName)) { - if(!connections.get(hostName).isConnected()){ - connection = new ResourceConnection(iHostMonitorData,getAuthenticationInfo()); + if (!connections.get(hostName).isConnected()) { + connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo()); connections.put(hostName, connection); - }else{ + } else { logger.debug("We already have this connection so not going to create one"); connection = connections.get(hostName); } } else { - connection = new ResourceConnection(iHostMonitorData,getAuthenticationInfo()); + connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo()); connections.put(hostName, connection); } @@ -180,18 +181,18 @@ public class HPCPullMonitor extends PullMonitor { List monitorID = iHostMonitorData.getMonitorIDs(); Iterator iterator1 = cancelJobList.iterator(); ListIterator monitorIDListIterator = monitorID.listIterator(); - while (monitorIDListIterator.hasNext()){ + while (monitorIDListIterator.hasNext()) { MonitorID iMonitorID = monitorIDListIterator.next(); - while(iterator1.hasNext()) { + while (iterator1.hasNext()) { String cancelMId = iterator1.next(); if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID())) { iMonitorID.setStatus(JobState.CANCELED); - CommonUtils.removeMonitorFromQueue(take,iMonitorID); + CommonUtils.removeMonitorFromQueue(take, iMonitorID); logger.debugId(cancelMId, "Found a match in cancel monitor queue, hence moved to the " + "completed job queue, experiment {}, task {} , job {}", iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobID()); logger.info("Job cancelled: marking the Job as ************CANCELLED************ experiment {}, task {}, job name {} .", - iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); + iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); sendNotification(iMonitorID); monitorIDListIterator.remove(); GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); @@ -201,22 +202,19 @@ public class HPCPullMonitor extends PullMonitor { iterator1 = cancelJobList.iterator(); } synchronized (completedJobsFromPush) { - ListIterator iterator = completedJobsFromPush.listIterator(); - monitorIDListIterator = monitorID.listIterator(); - while (monitorIDListIterator.hasNext()) { - MonitorID iMonitorID = monitorIDListIterator.next(); - String completeId = null; - while (iterator.hasNext()) { - completeId = iterator.next(); + for (ListIterator iterator = completedJobsFromPush.listIterator(); iterator.hasNext(); ) { + String completeId = iterator.next(); + for (monitorIDListIterator = monitorID.listIterator(); monitorIDListIterator.hasNext(); ) { + MonitorID iMonitorID = monitorIDListIterator.next(); if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) { logger.info("This job is finished because push notification came with " + completeId); iMonitorID.setStatus(JobState.COMPLETE); - CommonUtils.removeMonitorFromQueue(take,iMonitorID);//we have to make this empty everytime we iterate, otherwise this list will accumulate and will lead to a memory leak + CommonUtils.removeMonitorFromQueue(take, iMonitorID);//we have to make this empty everytime we iterate, otherwise this list will accumulate and will lead to a memory leak logger.debugId(completeId, "Push notification updated job {} status to {}. " + "experiment {} , task {}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(), iMonitorID.getExperimentID(), iMonitorID.getTaskID()); logger.info("AMQP message recieved: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", - iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); + iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); iterator.remove(); sendNotification(iMonitorID); @@ -224,36 +222,34 @@ public class HPCPullMonitor extends PullMonitor { break; } } - iterator = completedJobsFromPush.listIterator(); } } // we have to get this again because we removed the already completed jobs with amqp messages monitorID = iHostMonitorData.getMonitorIDs(); Map jobStatuses = connection.getJobStatuses(monitorID); - Iterator iterator = monitorID.listIterator(); - while (iterator.hasNext()) { + for (Iterator iterator = monitorID.listIterator(); iterator.hasNext(); ) { MonitorID iMonitorID = iterator.next(); currentMonitorID = iMonitorID; - if (!JobState.CANCELED.equals(iMonitorID.getStatus())&& + if (!JobState.CANCELED.equals(iMonitorID.getStatus()) && !JobState.COMPLETE.equals(iMonitorID.getStatus())) { iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is NOT a simple setter we have a logic - }else if(JobState.COMPLETE.equals(iMonitorID.getStatus())){ + } else if (JobState.COMPLETE.equals(iMonitorID.getStatus())) { logger.debugId(iMonitorID.getJobID(), "Moved job {} to completed jobs map, experiment {}, " + "task {}", iMonitorID.getJobID(), iMonitorID.getExperimentID(), iMonitorID.getTaskID()); - CommonUtils.removeMonitorFromQueue(take,iMonitorID); + CommonUtils.removeMonitorFromQueue(take, iMonitorID); logger.info("PULL Notification is complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", - iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); + iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); } - iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()+","+iMonitorID.getJobName())); //IMPORTANT this is not a simple setter we have a logic + iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is not a simple setter we have a logic iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); sendNotification(iMonitorID); // if the job is completed we do not have to put the job to the queue again iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); } - iterator = monitorID.listIterator(); - while(iterator.hasNext()){ + + for (Iterator iterator = monitorID.listIterator(); iterator.hasNext(); ) { MonitorID iMonitorID = iterator.next(); if (iMonitorID.getFailedCount() > FAILED_COUNT) { iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); @@ -276,9 +272,9 @@ public class HPCPullMonitor extends PullMonitor { " Experiment {} , task {}", iMonitorID.getFailedCount(), iMonitorID.getExperimentID(), iMonitorID.getTaskID()); logger.info("Listing directory came as complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", - iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); + iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); sendNotification(iMonitorID); - CommonUtils.removeMonitorFromQueue(take,iMonitorID); + CommonUtils.removeMonitorFromQueue(take, iMonitorID); GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); } else { iMonitorID.setFailedCount(0); @@ -302,6 +298,14 @@ public class HPCPullMonitor extends PullMonitor { // during individual monitorID removal we remove the HostMonitorData object if it become empty // so if all the jobs are finished for all the hostMOnitorId objects in userMonitorData object // we should remove it from the queue so here we do not put it back. + for (ListIterator iterator1 = take.getHostMonitorData().listIterator(); iterator1.hasNext(); ) { + HostMonitorData iHostMonitorID = iterator1.next(); + if (iHostMonitorID.getMonitorIDs().size() == 0) { + iterator1.remove(); + logger.debug("Removed host {} from monitoring queue", iHostMonitorID.getHost() + .getType().getHostAddress()); + } + } if(take.getHostMonitorData().size()!=0) { queue.put(take); } http://git-wip-us.apache.org/repos/asf/airavata/blob/af73c444/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java index 6152505..531b8ff 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java @@ -170,11 +170,7 @@ public class CommonUtils { iterator2.remove(); logger.infoId(monitorID.getJobID(), "Removed the jobId: {} JobName: {} from monitoring last " + "status:{}", monitorID.getJobID(),monitorID.getJobName(), monitorID.getStatus().toString()); - if (iHostMonitorID.getMonitorIDs().size() == 0) { - iterator1.remove(); - logger.debug("Removed host {} from monitoring queue", iHostMonitorID.getHost() - .getType().getHostAddress()); - } + return; } }