airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [2/6] airavata git commit: Fixed - AIRAVATA-1684 with ReceivedDateTerm
Date Wed, 22 Apr 2015 20:43:10 GMT
Fixed - AIRAVATA-1684 with ReceivedDateTerm


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a5b7c59c
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a5b7c59c
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a5b7c59c

Branch: refs/heads/master
Commit: a5b7c59ced51634adddafce784996413c9c0e97d
Parents: cd9f581
Author: shamrath <shameerainfo@gmail.com>
Authored: Mon Apr 20 18:48:47 2015 -0400
Committer: shamrath <shameerainfo@gmail.com>
Committed: Mon Apr 20 18:48:47 2015 -0400

----------------------------------------------------------------------
 .../gfac/monitor/email/EmailBasedMonitor.java   | 135 ++++++++++---------
 1 file changed, 75 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/a5b7c59c/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index b649a56..a988752 100644
--- a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -49,7 +49,11 @@ import javax.mail.MessagingException;
 import javax.mail.Session;
 import javax.mail.Store;
 import javax.mail.search.FlagTerm;
+import javax.mail.search.ReceivedDateTerm;
+import javax.validation.constraints.NotNull;
 import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -58,19 +62,15 @@ import java.util.concurrent.ConcurrentHashMap;
 public class EmailBasedMonitor implements Runnable{
     private static final AiravataLogger log = AiravataLoggerFactory.getLogger(EmailBasedMonitor.class);
 
-    private static final String PBS_CONSULT_SDSC_EDU = "pbsconsult@sdsc.edu";
-    private static final String SLURM_BATCH_STAMPEDE = "slurm@batch1.stampede.tacc.utexas.edu";
-    private static final String LONESTAR_ADDRESS = "root@c312-206.ls4.tacc.utexas.edu";
+    public static final int COMPARISON = 6; // after and equal
     private final EmailMonitorProperty emailMonitorProperty;
     private boolean stopMonitoring = false;
 
     private Session session ;
     private Store store;
     private Folder emailFolder;
-//    private String host, emailAddress, password, folderName, mailStoreProtocol;
     private Properties properties;
     private final ResourceJobManagerType RESOURCE_JOB_MONITOR_TYPE;
-
     private Map<String, JobExecutionContext> jobMonitorMap = new ConcurrentHashMap<String,
JobExecutionContext>();
 
     public EmailBasedMonitor(EmailMonitorProperty emailMonitorProp, ResourceJobManagerType
type) {
@@ -85,21 +85,6 @@ public class EmailBasedMonitor implements Runnable{
 
     }
 
-/*    public static EmailBasedMonitor getInstant(EmailMonitorProperty emailMonitorProp, MonitorPublisher
monitorPublisher)
-            throws ApplicationSettingsException {
-        if (emailBasedMonitor == null) {
-            synchronized (EmailBasedMonitor.class) {
-                if (emailBasedMonitor == null) {
-                    emailBasedMonitor = new EmailBasedMonitor(emailMonitorProp);
-                    Thread thread = new Thread(emailBasedMonitor);
-                    thread.start();
-                }
-            }
-        }
-
-        return emailBasedMonitor;
-    }*/
-
     public void addToJobMonitorMap(JobExecutionContext jobExecutionContext) {
         addToJobMonitorMap(jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext);
     }
@@ -136,50 +121,24 @@ public class EmailBasedMonitor implements Runnable{
             store = session.getStore(getProtocol(emailMonitorProperty.getStoreProtocol()));
             store.connect(emailMonitorProperty.getHost(), emailMonitorProperty.getEmailAddress(),
                     emailMonitorProperty.getPassword());
+            emailFolder = store.getFolder(emailMonitorProperty.getFolderName());
+            emailFolder.open(Folder.READ_WRITE);
+            // first time we search for all unread messages.
+            Date preDate = Calendar.getInstance().getTime();
+            Date nextDate;
+            processMessages(emailFolder.search(new FlagTerm(new Flags(Flags.Flag.SEEN), false)));
+            // then we search message continuously from prevDate to present.
             while (!(stopMonitoring || ServerSettings.isStopAllThreads())) {
                 if (!store.isConnected()) {
                     store.connect();
+                    emailFolder = store.getFolder(emailMonitorProperty.getFolderName());
+                    emailFolder.open(Folder.READ_WRITE);
                 }
-                Thread.sleep(ServerSettings.getEmailMonitorPeriod());
-                emailFolder = store.getFolder(emailMonitorProperty.getFolderName());
-                emailFolder.open(Folder.READ_WRITE);
-                Message[] searchMessages = emailFolder.search(new FlagTerm(new Flags(Flags.Flag.SEEN),
false));
-                List<Message> processedMessages = new ArrayList<>();
-                List<Message> unreadMessages = new ArrayList<>();
-                for (Message message : searchMessages) {
-                    try {
-                        JobStatusResult jobStatusResult = parse(message);
-                        JobExecutionContext jEC = jobMonitorMap.get(jobStatusResult.getJobId());
-                        if (jEC != null) {
-                            process(jobStatusResult, jEC);
-                            processedMessages.add(message);
-                        } else {
-                            // we can get JobExecutionContext null in multiple Gfac instances
environment,
-                            // where this job is not submitted by this Gfac instance hence
we ignore this message.
-                            unreadMessages.add(message);
-//                            log.info("JobExecutionContext is not found for job Id " + jobStatusResult.getJobId());
-                        }
-                    } catch (AiravataException e) {
-                        log.error("Error parsing email message =====================================>",
e);
-                        try {
-                            writeEnvelopeOnError(message);
-                        } catch (MessagingException e1) {
-                            log.error("Error printing envelop of the email");
-                        }
-                    }
-                }
-                if (!processedMessages.isEmpty()) {
-                    Message[] seenMessages = new Message[processedMessages.size()];
-                    processedMessages.toArray(seenMessages);
-                    emailFolder.setFlags(seenMessages, new Flags(Flags.Flag.SEEN), true);
-
-                }
-                if (!unreadMessages.isEmpty()) {
-                    Message[] unseenMessages = new Message[unreadMessages.size()];
-                    unreadMessages.toArray(unseenMessages);
-                    emailFolder.setFlags(unseenMessages, new Flags(Flags.Flag.SEEN), false);
-                }
-                emailFolder.close(false);
+                Thread.sleep(ServerSettings.getEmailMonitorPeriod());// sleep a bit - get
rest till job finishes
+                nextDate = Calendar.getInstance().getTime();
+                Message[] searchMessages = emailFolder.search(new ReceivedDateTerm(COMPARISON,
preDate));
+                processMessages(searchMessages);
+                preDate = nextDate; // this is a critical line
             }
         } catch (MessagingException e) {
             log.error("Couldn't connect to the store ", e);
@@ -189,6 +148,7 @@ public class EmailBasedMonitor implements Runnable{
             log.error("UnHandled arguments ", e);
         } finally {
             try {
+                emailFolder.close(false);
                 store.close();
             } catch (MessagingException e) {
                 log.error("Store close operation failed, couldn't close store", e);
@@ -196,6 +156,61 @@ public class EmailBasedMonitor implements Runnable{
         }
     }
 
+    private void processMessages(Message[] searchMessages) throws MessagingException {
+        List<Message> processedMessages = new ArrayList<>();
+        List<Message> unreadMessages = new ArrayList<>();
+        for (Message message : searchMessages) {
+            try {
+                JobStatusResult jobStatusResult = parse(message);
+                JobExecutionContext jEC = jobMonitorMap.get(jobStatusResult.getJobId());
+                if (jEC != null) {
+                    process(jobStatusResult, jEC);
+                    processedMessages.add(message);
+                } else {
+                    // we can get JobExecutionContext null in multiple Gfac instances environment,
+                    // where this job is not submitted by this Gfac instance hence we ignore
this message.
+                    unreadMessages.add(message);
+//                  log.info("JobExecutionContext is not found for job Id " + jobStatusResult.getJobId());
+                }
+            } catch (AiravataException e) {
+                log.error("Error parsing email message =====================================>",
e);
+                try {
+                    writeEnvelopeOnError(message);
+                } catch (MessagingException e1) {
+                    log.error("Error printing envelop of the email");
+                }
+            } catch (MessagingException e) {
+                log.error("Error while retrieving sender address from message : " + message.toString());
+            }
+        }
+        if (!processedMessages.isEmpty()) {
+            Message[] seenMessages = new Message[processedMessages.size()];
+            processedMessages.toArray(seenMessages);
+            try {
+                emailFolder.setFlags(seenMessages, new Flags(Flags.Flag.SEEN), true);
+            } catch (MessagingException e) {
+                if (!store.isConnected()) {
+                    store.connect();
+                    emailFolder.setFlags(seenMessages, new Flags(Flags.Flag.SEEN), true);
+                }
+            }
+
+        }
+        if (!unreadMessages.isEmpty()) {
+            Message[] unseenMessages = new Message[unreadMessages.size()];
+            unreadMessages.toArray(unseenMessages);
+            try {
+                emailFolder.setFlags(unseenMessages, new Flags(Flags.Flag.SEEN), false);
+            } catch (MessagingException e) {
+                if (!store.isConnected()) {
+                    store.connect();
+                    emailFolder.setFlags(unseenMessages, new Flags(Flags.Flag.SEEN), false);
+
+                }
+            }
+        }
+    }
+
     private String getProtocol(EmailProtocol storeProtocol) throws AiravataException {
         switch (storeProtocol) {
             case IMAPS:


Mime
View raw message