airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chath...@apache.org
Subject [12/16] airavata git commit: fixig concurrent modification exctpio in monitoring
Date Tue, 11 Nov 2014 19:10:59 GMT
fixig concurrent modification exctpio in monitoring


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/af73c444
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/af73c444
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/af73c444

Branch: refs/heads/gfac_appcatalog_int
Commit: af73c44468149b23053da8dc3247e78894fd751d
Parents: 97b384b
Author: Ginnaliya Gamathige <lginnali@149-160-220-203.dhcp-bl.indiana.edu>
Authored: Tue Nov 11 10:50:02 2014 -0500
Committer: Ginnaliya Gamathige <lginnali@149-160-220-203.dhcp-bl.indiana.edu>
Committed: Tue Nov 11 10:50:02 2014 -0500

----------------------------------------------------------------------
 .../monitor/impl/pull/qstat/HPCPullMonitor.java | 66 +++++++++++---------
 .../airavata/gfac/monitor/util/CommonUtils.java |  6 +-
 2 files changed, 36 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/af73c444/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 8e5f758..66cc5f7 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -158,21 +158,22 @@ public class HPCPullMonitor extends PullMonitor {
         try {
             take = this.queue.take();
             List<HostMonitorData> hostMonitorData = take.getHostMonitorData();
-            for (HostMonitorData iHostMonitorData : hostMonitorData) {
+            for (ListIterator<HostMonitorData> hostIterator = hostMonitorData.listIterator();
hostIterator.hasNext();) {
+                HostMonitorData iHostMonitorData = hostIterator.next();
                 if (iHostMonitorData.getHost().getType() instanceof GsisshHostType
                         || iHostMonitorData.getHost().getType() instanceof SSHHostType) {
-                    String hostName =  iHostMonitorData.getHost().getType().getHostAddress();
+                    String hostName = iHostMonitorData.getHost().getType().getHostAddress();
                     ResourceConnection connection = null;
                     if (connections.containsKey(hostName)) {
-                        if(!connections.get(hostName).isConnected()){
-                            connection = new ResourceConnection(iHostMonitorData,getAuthenticationInfo());
+                        if (!connections.get(hostName).isConnected()) {
+                            connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo());
                             connections.put(hostName, connection);
-                        }else{
+                        } else {
                             logger.debug("We already have this connection so not going to
create one");
                             connection = connections.get(hostName);
                         }
                     } else {
-                        connection = new ResourceConnection(iHostMonitorData,getAuthenticationInfo());
+                        connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo());
                         connections.put(hostName, connection);
                     }
 
@@ -180,18 +181,18 @@ public class HPCPullMonitor extends PullMonitor {
                     List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs();
                     Iterator<String> iterator1 = cancelJobList.iterator();
                     ListIterator<MonitorID> monitorIDListIterator = monitorID.listIterator();
-                    while (monitorIDListIterator.hasNext()){
+                    while (monitorIDListIterator.hasNext()) {
                         MonitorID iMonitorID = monitorIDListIterator.next();
-                        while(iterator1.hasNext()) {
+                        while (iterator1.hasNext()) {
                             String cancelMId = iterator1.next();
                             if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID()))
{
                                 iMonitorID.setStatus(JobState.CANCELED);
-                                CommonUtils.removeMonitorFromQueue(take,iMonitorID);
+                                CommonUtils.removeMonitorFromQueue(take, iMonitorID);
                                 logger.debugId(cancelMId, "Found a match in cancel monitor
queue, hence moved to the " +
                                                 "completed job queue, experiment {}, task
{} , job {}",
                                         iMonitorID.getExperimentID(), iMonitorID.getTaskID(),
iMonitorID.getJobID());
                                 logger.info("Job cancelled: marking the Job as ************CANCELLED************
experiment {}, task {}, job name {} .",
-                                        iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
+                                        iMonitorID.getExperimentID(), iMonitorID.getTaskID(),
iMonitorID.getJobName());
                                 sendNotification(iMonitorID);
                                 monitorIDListIterator.remove();
                                 GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac,
iMonitorID, publisher));
@@ -201,22 +202,19 @@ public class HPCPullMonitor extends PullMonitor {
                         iterator1 = cancelJobList.iterator();
                     }
                     synchronized (completedJobsFromPush) {
-                        ListIterator<String> iterator = completedJobsFromPush.listIterator();
-                        monitorIDListIterator = monitorID.listIterator();
-                        while (monitorIDListIterator.hasNext()) {
-                            MonitorID iMonitorID = monitorIDListIterator.next();
-                            String completeId = null;
-                            while (iterator.hasNext()) {
-                                 completeId = iterator.next();
+                        for (ListIterator<String> iterator = completedJobsFromPush.listIterator();
iterator.hasNext(); ) {
+                            String completeId = iterator.next();
+                            for (monitorIDListIterator = monitorID.listIterator(); monitorIDListIterator.hasNext();
) {
+                                MonitorID iMonitorID = monitorIDListIterator.next();
                                 if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName()))
{
                                     logger.info("This job is finished because push notification
came with <username,jobName> " + completeId);
                                     iMonitorID.setStatus(JobState.COMPLETE);
-                                    CommonUtils.removeMonitorFromQueue(take,iMonitorID);//we
have to make this empty everytime we iterate, otherwise this list will accumulate and will
lead to a memory leak
+                                    CommonUtils.removeMonitorFromQueue(take, iMonitorID);//we
have to make this empty everytime we iterate, otherwise this list will accumulate and will
lead to a memory leak
                                     logger.debugId(completeId, "Push notification updated
job {} status to {}. " +
                                                     "experiment {} , task {}.", iMonitorID.getJobID(),
JobState.COMPLETE.toString(),
                                             iMonitorID.getExperimentID(), iMonitorID.getTaskID());
                                     logger.info("AMQP message recieved: marking the Job as
************COMPLETE************ experiment {}, task {}, job name {} .",
-                                            iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
+                                            iMonitorID.getExperimentID(), iMonitorID.getTaskID(),
iMonitorID.getJobName());
 
                                     iterator.remove();
                                     sendNotification(iMonitorID);
@@ -224,36 +222,34 @@ public class HPCPullMonitor extends PullMonitor {
                                     break;
                                 }
                             }
-                            iterator = completedJobsFromPush.listIterator();
                         }
                     }
 
                     // we have to get this again because we removed the already completed
jobs with amqp messages
                     monitorID = iHostMonitorData.getMonitorIDs();
                     Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID);
-                    Iterator<MonitorID> iterator = monitorID.listIterator();
-                    while (iterator.hasNext()) {
+                    for (Iterator<MonitorID> iterator = monitorID.listIterator(); iterator.hasNext();
) {
                         MonitorID iMonitorID = iterator.next();
                         currentMonitorID = iMonitorID;
-                        if (!JobState.CANCELED.equals(iMonitorID.getStatus())&&
+                        if (!JobState.CANCELED.equals(iMonitorID.getStatus()) &&
                                 !JobState.COMPLETE.equals(iMonitorID.getStatus())) {
                             iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() +
"," + iMonitorID.getJobName()));    //IMPORTANT this is NOT a simple setter we have a logic
-                        }else if(JobState.COMPLETE.equals(iMonitorID.getStatus())){
+                        } else if (JobState.COMPLETE.equals(iMonitorID.getStatus())) {
                             logger.debugId(iMonitorID.getJobID(), "Moved job {} to completed
jobs map, experiment {}, " +
                                     "task {}", iMonitorID.getJobID(), iMonitorID.getExperimentID(),
iMonitorID.getTaskID());
-                            CommonUtils.removeMonitorFromQueue(take,iMonitorID);
+                            CommonUtils.removeMonitorFromQueue(take, iMonitorID);
                             logger.info("PULL Notification is complete: marking the Job as
************COMPLETE************ experiment {}, task {}, job name {} .",
-                                    iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
+                                    iMonitorID.getExperimentID(), iMonitorID.getTaskID(),
iMonitorID.getJobName());
                             GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac,
iMonitorID, publisher));
                         }
-                        iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()+","+iMonitorID.getJobName()));
   //IMPORTANT this is not a simple setter we have a logic
+                        iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + ","
+ iMonitorID.getJobName()));    //IMPORTANT this is not a simple setter we have a logic
                         iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
                         sendNotification(iMonitorID);
                         // if the job is completed we do not have to put the job to the queue
again
                         iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
                     }
-                    iterator = monitorID.listIterator();
-                    while(iterator.hasNext()){
+
+                    for (Iterator<MonitorID> iterator = monitorID.listIterator(); iterator.hasNext();
) {
                         MonitorID iMonitorID = iterator.next();
                         if (iMonitorID.getFailedCount() > FAILED_COUNT) {
                             iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
@@ -276,9 +272,9 @@ public class HPCPullMonitor extends PullMonitor {
                                                 " Experiment {} , task {}", iMonitorID.getFailedCount(),
                                         iMonitorID.getExperimentID(), iMonitorID.getTaskID());
                                 logger.info("Listing directory came as complete: marking
the Job as ************COMPLETE************ experiment {}, task {}, job name {} .",
-                                        iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
+                                        iMonitorID.getExperimentID(), iMonitorID.getTaskID(),
iMonitorID.getJobName());
                                 sendNotification(iMonitorID);
-                                CommonUtils.removeMonitorFromQueue(take,iMonitorID);
+                                CommonUtils.removeMonitorFromQueue(take, iMonitorID);
                                 GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac,
iMonitorID, publisher));
                             } else {
                                 iMonitorID.setFailedCount(0);
@@ -302,6 +298,14 @@ public class HPCPullMonitor extends PullMonitor {
             // during individual monitorID removal we remove the HostMonitorData object if
it become empty
             // so if all the jobs are finished for all the hostMOnitorId objects in userMonitorData
object
             // we should remove it from the queue so here we do not put it back.
+            for (ListIterator<HostMonitorData> iterator1 = take.getHostMonitorData().listIterator();
iterator1.hasNext(); ) {
+                HostMonitorData iHostMonitorID = iterator1.next();
+                if (iHostMonitorID.getMonitorIDs().size() == 0) {
+                    iterator1.remove();
+                    logger.debug("Removed host {} from monitoring queue", iHostMonitorID.getHost()
+                            .getType().getHostAddress());
+                }
+            }
             if(take.getHostMonitorData().size()!=0) {
                 queue.put(take);
             }

http://git-wip-us.apache.org/repos/asf/airavata/blob/af73c444/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
index 6152505..531b8ff 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -170,11 +170,7 @@ public class CommonUtils {
                                     iterator2.remove();
                                     logger.infoId(monitorID.getJobID(), "Removed the jobId:
{} JobName: {} from monitoring last " +
                                             "status:{}", monitorID.getJobID(),monitorID.getJobName(),
monitorID.getStatus().toString());
-                                    if (iHostMonitorID.getMonitorIDs().size() == 0) {
-                                        iterator1.remove();
-                                        logger.debug("Removed host {} from monitoring queue",
iHostMonitorID.getHost()
-                                                .getType().getHostAddress());
-                                    }
+
                                     return;
                                 }
                             }


Mime
View raw message