airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject airavata git commit: Adding Email monitoring code provided by Siddharth
Date Mon, 20 Jun 2016 19:15:49 GMT
Repository: airavata
Updated Branches:
  refs/heads/develop 120d06af6 -> a79e69028


Adding Email monitoring code provided by Siddharth


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a79e6902
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a79e6902
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a79e6902

Branch: refs/heads/develop
Commit: a79e69028869dc8353d474aa65a6ba5ff3926b0b
Parents: 120d06a
Author: Shameera Rathnayaka <shameerainfo@gmail.com>
Authored: Mon Jun 20 15:15:39 2016 -0400
Committer: Shameera Rathnayaka <shameerainfo@gmail.com>
Committed: Mon Jun 20 15:15:39 2016 -0400

----------------------------------------------------------------------
 modules/monitoring/pom.xml                      |  13 +++
 .../airavata/monitoring/MessageExtract.java     | 109 +++++++++++++++++++
 .../org/apache/airavata/monitoring/Util.java    |  37 +++++++
 .../monitoring/consumer/EmailConsumer.java      |  84 ++++++++++++++
 .../monitoring/consumer/EmailReceiver.java      |  60 ++++++++++
 .../monitoring/mailbox/GmailSMTPMailBox.java    |  78 +++++++++++++
 .../airavata/monitoring/mailbox/MailBox.java    |  19 ++++
 .../producer/RabbitMQEmailPublisher.java        |  94 ++++++++++++++++
 .../monitoring/simulator/FetchPublish.java      |  39 +++++++
 .../monitoring/simulator/Simulator.java         |  50 +++++++++
 10 files changed, 583 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/a79e6902/modules/monitoring/pom.xml
----------------------------------------------------------------------
diff --git a/modules/monitoring/pom.xml b/modules/monitoring/pom.xml
index c37e0a8..dae1e1d 100644
--- a/modules/monitoring/pom.xml
+++ b/modules/monitoring/pom.xml
@@ -15,5 +15,18 @@
     <packaging>jar</packaging>
     <name>Airavata Monitoring</name>
     <url>http://airavata.apache.org/</url>
+    <dependencies>
+        <dependency>
+            <groupId>com.sun.mail</groupId>
+            <artifactId>javax.mail</artifactId>
+            <version>1.5.5</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.rabbitmq</groupId>
+            <artifactId>amqp-client</artifactId>
+            <version>3.6.1</version>
+        </dependency>
+    </dependencies>
     
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/a79e6902/modules/monitoring/src/main/java/org/apache/airavata/monitoring/MessageExtract.java
----------------------------------------------------------------------
diff --git a/modules/monitoring/src/main/java/org/apache/airavata/monitoring/MessageExtract.java
b/modules/monitoring/src/main/java/org/apache/airavata/monitoring/MessageExtract.java
new file mode 100755
index 0000000..18d38b2
--- /dev/null
+++ b/modules/monitoring/src/main/java/org/apache/airavata/monitoring/MessageExtract.java
@@ -0,0 +1,109 @@
+package org.apache.airavata.monitoring;
+
+import javax.mail.Address;
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * Represents the required data extracted from javax.mail.Message, which is serializable
+ *
+ * @author Siddharth Jain
+ */
+public class MessageExtract implements Serializable {
+    private static final long serialVersionUID = 8129498758236043568L;
+    private Address from;
+    private Address[] recipients;
+    private String content;
+    private String subject;
+
+    public MessageExtract(Message message) throws MessagingException {
+        String subject = message.getSubject();
+        // TODO fetch the actual content
+        String content = "content";
+        Address from = message.getFrom()[0];
+        Address[] recepientAddresses = message
+                .getRecipients(Message.RecipientType.TO);
+        this.from = from;
+        this.recipients = recepientAddresses;
+        this.content = content;
+        this.subject = subject;
+    }
+
+    public MessageExtract(Address from, Address[] recipients, String content,
+                          String subject) {
+        super();
+        this.from = from;
+        this.recipients = recipients;
+        this.content = content;
+        this.subject = subject;
+    }
+
+    public Address getFrom() {
+        return from;
+    }
+
+    public void setFrom(Address from) {
+        this.from = from;
+    }
+
+    public Address[] getRecipients() {
+        return recipients;
+    }
+
+    public void setRecipients(Address[] recipients) {
+        this.recipients = recipients;
+    }
+
+    public String getContent() {
+        return content;
+    }
+
+    public void setContent(String content) {
+        this.content = content;
+    }
+
+    public String getSubject() {
+        return subject;
+    }
+
+    public void setSubject(String subject) {
+        this.subject = subject;
+    }
+
+    private void writeObject(ObjectOutputStream stream)
+            throws IOException {
+        stream.defaultWriteObject();
+    }
+
+    private void readObject(java.io.ObjectInputStream stream)
+            throws IOException, ClassNotFoundException {
+        stream.defaultReadObject();
+    }
+
+    /**
+     * Get Serialized bytes of the instance
+     *
+     * @return Serialized bytes of the instance
+     * @throws IOException
+     */
+    public byte[] getSerializedBytes() throws IOException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutputStream os = new ObjectOutputStream(bos);
+        os.writeObject(this);
+        byte[] msgExtractBytes = bos.toByteArray();
+        return msgExtractBytes;
+    }
+
+    @Override
+    public String toString() {
+        return "EmailMessage [from=" + from + ", recipients="
+                + Arrays.toString(recipients) + ", content=" + content
+                + ", subject=" + subject + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a79e6902/modules/monitoring/src/main/java/org/apache/airavata/monitoring/Util.java
----------------------------------------------------------------------
diff --git a/modules/monitoring/src/main/java/org/apache/airavata/monitoring/Util.java b/modules/monitoring/src/main/java/org/apache/airavata/monitoring/Util.java
new file mode 100755
index 0000000..f28b42d
--- /dev/null
+++ b/modules/monitoring/src/main/java/org/apache/airavata/monitoring/Util.java
@@ -0,0 +1,37 @@
+package org.apache.airavata.monitoring;
+
+import java.util.Properties;
+
+public class Util {
+    /**
+     * Fetch SMTP Properties. Will be reworked to fetch the properties from properties file.
+     *
+     * @return SMTP Properties
+     */
+    public static Properties getSMTPProperties() {
+        Properties props = new Properties();
+        props.setProperty("mail.store.protocol", "imaps");
+        props.setProperty("mail.smtp.host", "smtp.gmail.com");
+        props.setProperty("mail.smtp.socketFactory.port", "993");
+        props.setProperty("mail.smtp.socketFactory.class", "javax.net.ssl.SSLSocketFactory");
+        props.setProperty("mail.smtp.port", "993");
+        props.setProperty("mail.userID", "test.airavata@gmail.com");
+        props.setProperty("mail.password", "airavata");
+        return props;
+    }
+
+    /**
+     * Fetch Broker Properties. Will be reworked to fetch the properties from properties
file.
+     *
+     * @return
+     */
+    public static Properties getBrokerProperties() {
+        Properties props = new Properties();
+        props.setProperty("monitor.email.exchange.name", "monitor");
+        props.setProperty("monitor.email.broker.URI", "amqp://localhost:5672");
+        props.setProperty("monitor.email.broker.queue1.name", "q1");
+        props.setProperty("monitor.email.broker.queue2.name", "q2");
+        return props;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a79e6902/modules/monitoring/src/main/java/org/apache/airavata/monitoring/consumer/EmailConsumer.java
----------------------------------------------------------------------
diff --git a/modules/monitoring/src/main/java/org/apache/airavata/monitoring/consumer/EmailConsumer.java
b/modules/monitoring/src/main/java/org/apache/airavata/monitoring/consumer/EmailConsumer.java
new file mode 100755
index 0000000..b536ac4
--- /dev/null
+++ b/modules/monitoring/src/main/java/org/apache/airavata/monitoring/consumer/EmailConsumer.java
@@ -0,0 +1,84 @@
+package org.apache.airavata.monitoring.consumer;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+import org.apache.airavata.monitoring.MessageExtract;
+
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import javax.mail.Session;
+import javax.mail.internet.MimeMessage;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+
+public class EmailConsumer extends DefaultConsumer {
+
+    public EmailConsumer(Channel channel) {
+        super(channel);
+    }
+
+    @Override
+    public void handleDelivery(String consumerTag, Envelope envelope,
+                               AMQP.BasicProperties properties, byte[] body) throws IOException
{
+        ByteArrayInputStream bis = new ByteArrayInputStream(body);
+        ObjectInput in = new ObjectInputStream(bis);
+        MessageExtract msgExtract = null;
+        Message message = null;
+        try {
+            // deserializing the message received from broker into
+            // MessageExtract
+            msgExtract = (MessageExtract) in.readObject();
+            // reconstructing the javax Message
+            message = reContructMessage(msgExtract);
+            System.out.println(" [x] Received message from'"
+                    + message.getFrom()[0].toString() + "'");
+            System.out.println(" [x] Received message Recepients'"
+                    + message.getRecipients(Message.RecipientType.TO)[0]
+                    .toString() + "'");
+            System.out.println(" [x] Received message subject'"
+                    + message.getSubject() + "'");
+            System.out.println(" [x] Received message content'"
+                    + message.getContent() + "'");
+            processMessage(message);
+        } catch (ClassNotFoundException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } catch (MessagingException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+
+    }
+
+    /**
+     * @param msgExtract
+     * @return Message with content, subject,from and to fields representative
+     * of original e-mail message
+     * @throws MessagingException
+     */
+    private Message reContructMessage(MessageExtract msgExtract)
+            throws MessagingException {
+        Message message = new MimeMessage((Session) null);
+        message = new MimeMessage((Session) null);
+        message.setText(msgExtract.getContent());
+        message.setSubject(msgExtract.getSubject());
+        message.addRecipients(Message.RecipientType.TO,
+                msgExtract.getRecipients());
+        message.setFrom(msgExtract.getFrom());
+        return message;
+    }
+
+    /***
+     * Process e-mail message
+     *
+     * @param message e-mail message
+     */
+    private void processMessage(Message message) {
+        // TODO processing
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a79e6902/modules/monitoring/src/main/java/org/apache/airavata/monitoring/consumer/EmailReceiver.java
----------------------------------------------------------------------
diff --git a/modules/monitoring/src/main/java/org/apache/airavata/monitoring/consumer/EmailReceiver.java
b/modules/monitoring/src/main/java/org/apache/airavata/monitoring/consumer/EmailReceiver.java
new file mode 100755
index 0000000..a225290
--- /dev/null
+++ b/modules/monitoring/src/main/java/org/apache/airavata/monitoring/consumer/EmailReceiver.java
@@ -0,0 +1,60 @@
+package org.apache.airavata.monitoring.consumer;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Consumer;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.TimeoutException;
+
+public class EmailReceiver extends Thread {
+    private static volatile Connection connection;
+    private static volatile Channel channel;
+    private static final String EXCHANGE_TYPE = "fanout";
+
+    private String exchangeName;
+    private String queueName;
+    private String brokerURI;
+    private Thread recieverThread;
+
+    public EmailReceiver(String exchangeName, String queueName, String brokerURI) {
+        this.exchangeName = exchangeName;
+        this.queueName = queueName;
+        this.brokerURI = brokerURI;
+    }
+
+    public void startThread() throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException,
IOException, TimeoutException {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setAutomaticRecoveryEnabled(true);
+        factory.setUri(brokerURI);
+        connection = factory.newConnection();
+        recieverThread = new Thread(this);
+        recieverThread.start();
+    }
+
+    public void shutdown() throws IOException, TimeoutException {
+        channel.close();
+        connection.close();
+        System.out.println("Email receiver thread succesfully shutdown");
+    }
+
+    @Override
+    public void run() {
+        try {
+            channel = connection.createChannel();
+            channel.exchangeDeclare(exchangeName, EXCHANGE_TYPE);
+            channel.queueDeclare(queueName, true, false, false, null).getQueue();
+            channel.queueBind(queueName, exchangeName, "");
+            System.out.println(" [*] Waiting for messages.");
+            Consumer consumer = new EmailConsumer(channel);
+            channel.basicConsume(queueName, true, consumer);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/a79e6902/modules/monitoring/src/main/java/org/apache/airavata/monitoring/mailbox/GmailSMTPMailBox.java
----------------------------------------------------------------------
diff --git a/modules/monitoring/src/main/java/org/apache/airavata/monitoring/mailbox/GmailSMTPMailBox.java
b/modules/monitoring/src/main/java/org/apache/airavata/monitoring/mailbox/GmailSMTPMailBox.java
new file mode 100755
index 0000000..10450d0
--- /dev/null
+++ b/modules/monitoring/src/main/java/org/apache/airavata/monitoring/mailbox/GmailSMTPMailBox.java
@@ -0,0 +1,78 @@
+package org.apache.airavata.monitoring.mailbox;
+
+import javax.mail.*;
+import javax.mail.search.FlagTerm;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * GmailSMTPMailBox class represents Gmail SMTP Mail Box
+ *
+ * @author Siddharth Jain
+ */
+public class GmailSMTPMailBox implements MailBox {
+    //search criterion for mails
+    private enum SearchCriterion {
+        UNREAD
+    }
+
+    //TODO change it to READ_WRITE in production
+    private static final int MAIL_BOX_MODE = Folder.READ_ONLY;
+    private static final String INBOX_FOLDER_NAME = "inbox";
+    private Store store;
+    private Folder inbox;
+    private Map<SearchCriterion, FlagTerm> searchCriterion;
+
+    public GmailSMTPMailBox(Properties props) throws MessagingException {
+        // initialize message store
+        store = getMessageStore(props);
+        // intialize inbox
+        inbox = store.getFolder(INBOX_FOLDER_NAME);
+        inbox.open(MAIL_BOX_MODE);
+        initSearchCriterions();
+    }
+
+    /**
+     * Get Message Store
+     *
+     * @return Message Store
+     * @throws MessagingException
+     */
+    private Store getMessageStore(Properties props) throws MessagingException {
+        Session session = Session.getDefaultInstance(props, null);
+        Store store = session.getStore(props.getProperty("mail.store.protocol"));
+        store.connect(props.getProperty("mail.smtp.host"),
+                props.getProperty("mail.userID"),
+                props.getProperty("mail.password"));
+        return store;
+    }
+
+    /**
+     * Populate all the search criterions in searchCriterion map
+     */
+    private void initSearchCriterions() {
+        searchCriterion = new EnumMap<SearchCriterion, FlagTerm>(
+                SearchCriterion.class);
+        Flags seen = new Flags(Flags.Flag.SEEN);
+        FlagTerm unseenFlagTerm = new FlagTerm(seen, false);
+        searchCriterion.put(SearchCriterion.UNREAD, unseenFlagTerm);
+    }
+
+    @Override
+    public Message[] getUnreadMessages() throws MessagingException {
+        Message messages[] = inbox.search(searchCriterion
+                .get(SearchCriterion.UNREAD));
+        return messages;
+    }
+
+    /**
+     * Close all the connections to MailBox
+     *
+     * @throws MessagingException
+     */
+    public void closeConnection() throws MessagingException {
+        inbox.close(true);
+        store.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a79e6902/modules/monitoring/src/main/java/org/apache/airavata/monitoring/mailbox/MailBox.java
----------------------------------------------------------------------
diff --git a/modules/monitoring/src/main/java/org/apache/airavata/monitoring/mailbox/MailBox.java
b/modules/monitoring/src/main/java/org/apache/airavata/monitoring/mailbox/MailBox.java
new file mode 100755
index 0000000..2a285b8
--- /dev/null
+++ b/modules/monitoring/src/main/java/org/apache/airavata/monitoring/mailbox/MailBox.java
@@ -0,0 +1,19 @@
+package org.apache.airavata.monitoring.mailbox;
+
+import javax.mail.Message;
+import javax.mail.MessagingException;
+
+/**
+ * MailBox class represents a SMTP/IMAP e-mail box
+ *
+ * @author Siddharth Jain
+ */
+public interface MailBox {
+    /**
+     * Get Unread Mails
+     *
+     * @return Unread Mail Messages
+     * @throws MessagingException
+     */
+    Message[] getUnreadMessages() throws MessagingException;
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a79e6902/modules/monitoring/src/main/java/org/apache/airavata/monitoring/producer/RabbitMQEmailPublisher.java
----------------------------------------------------------------------
diff --git a/modules/monitoring/src/main/java/org/apache/airavata/monitoring/producer/RabbitMQEmailPublisher.java
b/modules/monitoring/src/main/java/org/apache/airavata/monitoring/producer/RabbitMQEmailPublisher.java
new file mode 100755
index 0000000..5348b0b
--- /dev/null
+++ b/modules/monitoring/src/main/java/org/apache/airavata/monitoring/producer/RabbitMQEmailPublisher.java
@@ -0,0 +1,94 @@
+package org.apache.airavata.monitoring.producer;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import org.apache.airavata.monitoring.MessageExtract;
+
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.TimeoutException;
+
+public class RabbitMQEmailPublisher {
+    private static final String EXCHANGE_TYPE = "fanout";
+    private ConnectionFactory factory;
+    private Connection connection;
+    private Channel channel;
+    private String exchangeName;
+
+    /**
+     * RabbitMQ Publisher with auto-recovery enabled
+     *
+     * @param exchangeName Name of the exchange
+     * @param brokerURL    Broker URL
+     * @param queueNames   Name of the queues which needs to be declared and binded to
+     *                     the exchange
+     * @throws IOException
+     * @throws TimeoutException
+     * @throws KeyManagementException
+     * @throws NoSuchAlgorithmException
+     * @throws URISyntaxException
+     */
+    public RabbitMQEmailPublisher(String exchangeName, String brokerURL,
+                                  String[] queueNames) throws IOException, TimeoutException,
+            KeyManagementException, NoSuchAlgorithmException,
+            URISyntaxException {
+        this.exchangeName = exchangeName;
+        // TODO get singleton instance of connection factory
+        this.factory = new ConnectionFactory();
+        this.factory.setUri(brokerURL);
+        factory.setAutomaticRecoveryEnabled(true);
+        this.connection = factory.newConnection();
+        this.channel = connection.createChannel();
+        this.channel.exchangeDeclare(exchangeName, EXCHANGE_TYPE);
+        for (String queueName : queueNames) {
+            // declare all the queues
+            channel.queueDeclare(queueName, true, false, false, null)
+                    .getQueue();
+            // bind all the queues to exchange
+            channel.queueBind(queueName, exchangeName, "");
+        }
+    }
+
+    public void publish(byte[] message) throws IOException {
+        channel.basicPublish(exchangeName, "", null, message);
+        System.out.println("[*]Publisher: Message Sent to Exchange:" + exchangeName + "");
+    }
+
+    /**
+     * Publish Message to exchange
+     *
+     * @param message Message to be published
+     * @throws IOException
+     * @throws MessagingException
+     */
+    public void publishMessage(Message message) throws IOException,
+            MessagingException {
+        MessageExtract msgExtract = new MessageExtract(message);
+        publish(msgExtract.getSerializedBytes());
+    }
+
+    /**
+     * Publish Messages to exchange
+     *
+     * @param messages Messages to be published
+     * @throws IOException
+     * @throws MessagingException
+     */
+    public void publishMessages(Message[] messages) throws IOException,
+            MessagingException {
+        for (Message message : messages) {
+            publishMessage(message);
+        }
+    }
+
+    public void shutdown() throws IOException, TimeoutException {
+        channel.close();
+        connection.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a79e6902/modules/monitoring/src/main/java/org/apache/airavata/monitoring/simulator/FetchPublish.java
----------------------------------------------------------------------
diff --git a/modules/monitoring/src/main/java/org/apache/airavata/monitoring/simulator/FetchPublish.java
b/modules/monitoring/src/main/java/org/apache/airavata/monitoring/simulator/FetchPublish.java
new file mode 100755
index 0000000..ad52675
--- /dev/null
+++ b/modules/monitoring/src/main/java/org/apache/airavata/monitoring/simulator/FetchPublish.java
@@ -0,0 +1,39 @@
+package org.apache.airavata.monitoring.simulator;
+
+import org.apache.airavata.monitoring.Util;
+import org.apache.airavata.monitoring.mailbox.GmailSMTPMailBox;
+import org.apache.airavata.monitoring.mailbox.MailBox;
+import org.apache.airavata.monitoring.producer.RabbitMQEmailPublisher;
+
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+
+public class FetchPublish {
+    private static final String EXCHANGE_NAME = "monitor";
+
+    public static void fetchEmailAndPublish() throws MessagingException, KeyManagementException,
NoSuchAlgorithmException, IOException, TimeoutException, URISyntaxException {
+        RabbitMQEmailPublisher publisher = getRabbitMQEmailPublisher();
+        MailBox gmailSmtpMailBox = new GmailSMTPMailBox(Util.getSMTPProperties());
+        Message[] messages = gmailSmtpMailBox.getUnreadMessages();
+        publisher.publishMessages(messages);
+        publisher.shutdown();
+    }
+
+
+    private static RabbitMQEmailPublisher getRabbitMQEmailPublisher() throws KeyManagementException,
NoSuchAlgorithmException, IOException, TimeoutException, URISyntaxException {
+        Properties brokerProps = Util.getBrokerProperties();
+        String exchangeName = brokerProps.getProperty("monitor.email.exchange.name");
+        String brokerURI = brokerProps.getProperty("monitor.email.broker.URI");
+        String[] queueNames = new String[]{brokerProps.getProperty("monitor.email.broker.queue1.name"),
brokerProps.getProperty("monitor.email.broker.queue2.name")};
+        RabbitMQEmailPublisher publisher = new RabbitMQEmailPublisher(exchangeName, brokerURI,
queueNames);
+        return publisher;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a79e6902/modules/monitoring/src/main/java/org/apache/airavata/monitoring/simulator/Simulator.java
----------------------------------------------------------------------
diff --git a/modules/monitoring/src/main/java/org/apache/airavata/monitoring/simulator/Simulator.java
b/modules/monitoring/src/main/java/org/apache/airavata/monitoring/simulator/Simulator.java
new file mode 100755
index 0000000..48382d1
--- /dev/null
+++ b/modules/monitoring/src/main/java/org/apache/airavata/monitoring/simulator/Simulator.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.monitoring.simulator;
+
+import org.apache.airavata.monitoring.consumer.EmailReceiver;
+
+public class Simulator {
+    private static final String EXCHANGE_NAME = "monitor";
+    private static final String QUEUE_NAME1 = "q1";
+    private static final String QUEUE_NAME2 = "q2";
+    private static final String BROKER_URI = "amqp://localhost:5672";
+
+    public static void main(String args[]) {
+        try {
+            //Consumer 1
+            EmailReceiver emailReciever1 = new EmailReceiver(EXCHANGE_NAME, QUEUE_NAME1,
BROKER_URI);
+            //Consumer 2
+            EmailReceiver emailReciever2 = new EmailReceiver(EXCHANGE_NAME, QUEUE_NAME2,
BROKER_URI);
+            emailReciever1.startThread();
+            emailReciever2.startThread();
+            //publisher
+            FetchPublish.fetchEmailAndPublish();
+            Thread.sleep(60000);
+            //shutdown after a minute, for demo purposes
+            emailReciever1.shutdown();
+            emailReciever2.shutdown();
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}


Mime
View raw message