activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r533814 - in /activemq/camel/trunk/camel-mail/src: main/java/org/apache/camel/component/mail/ test/java/org/apache/camel/component/mail/
Date Mon, 30 Apr 2007 17:46:12 GMT
Author: jstrachan
Date: Mon Apr 30 10:46:11 2007
New Revision: 533814

URL: http://svn.apache.org/viewvc?view=rev&rev=533814
Log:
completed the test case demonstrating the use of consuming of messages from a folder as well
as sending

Modified:
    activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/JavaMailConnection.java
    activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java
    activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
    activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
    activemq/camel/trunk/camel-mail/src/test/java/org/apache/camel/component/mail/MailRouteTest.java

Modified: activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/JavaMailConnection.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/JavaMailConnection.java?view=diff&rev=533814&r1=533813&r2=533814
==============================================================================
--- activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/JavaMailConnection.java
(original)
+++ activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/JavaMailConnection.java
Mon Apr 30 10:46:11 2007
@@ -25,6 +25,8 @@
 import javax.mail.AuthenticationFailedException;
 import javax.mail.MessagingException;
 import javax.mail.Transport;
+import javax.mail.Folder;
+import javax.mail.Store;
 
 /**
  * An extension of Spring's {@link JavaMailSenderImpl} to provide helper methods for listening
for new mail
@@ -33,23 +35,14 @@
  */
 public class JavaMailConnection extends JavaMailSenderImpl {
 
-    /**
-     * Create a new {@link Transport} which can then be used to consume new messages
-     *
-     * @throws MailAuthenticationException in case of authentication failure
-     * @throws MailSendException           in case of failure when sending a message
-     */
-    public Transport createTransport() throws MailException {
+    public Folder getFolder(String protocol, String folderName) {
         try {
-            Transport transport = getTransport(getSession());
-            transport.connect(getHost(), getPort(), getUsername(), getPassword());
-            return transport;
+            Store store = getSession().getStore(protocol);
+            store.connect(getHost(), getPort(), getUsername(), getPassword());
+            return store.getFolder(folderName);
         }
-        catch (AuthenticationFailedException ex) {
-            throw new MailAuthenticationException(ex);
-        }
-        catch (MessagingException ex) {
-            throw new MailSendException("Mail server connection failed", ex);
+        catch (MessagingException e) {
+            throw new MailSendException("Mail server connection failed", e);
         }
     }
 }

Modified: activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java?view=diff&rev=533814&r1=533813&r2=533814
==============================================================================
--- activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java
(original)
+++ activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java
Mon Apr 30 10:46:11 2007
@@ -39,6 +39,8 @@
     private int port = -1;
     private String destination;
     private String from = "camel@localhost";
+    private boolean deleteProcessedMessages = true;
+    private String folderName = "INBOX";
 
     public MailConfiguration() {
     }
@@ -83,6 +85,9 @@
         if (fragment == null || fragment.length() == 0) {
             fragment = userInfo + "@" + host;
         }
+        else {
+            setFolderName(fragment);
+        }
         setDestination(fragment);
     }
 
@@ -197,5 +202,21 @@
 
     public void setFrom(String from) {
         this.from = from;
+    }
+
+    public boolean isDeleteProcessedMessages() {
+        return deleteProcessedMessages;
+    }
+
+    public void setDeleteProcessedMessages(boolean deleteProcessedMessages) {
+        this.deleteProcessedMessages = deleteProcessedMessages;
+    }
+
+    public String getFolderName() {
+        return folderName;
+    }
+
+    public void setFolderName(String folderName) {
+        this.folderName = folderName;
     }
 }

Modified: activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?view=diff&rev=533814&r1=533813&r2=533814
==============================================================================
--- activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
(original)
+++ activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
Mon Apr 30 10:46:11 2007
@@ -19,12 +19,18 @@
 
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.impl.PollingConsumer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
+import javax.mail.Flags;
+import javax.mail.Folder;
 import javax.mail.Message;
+import javax.mail.MessagingException;
 import javax.mail.Transport;
-import javax.mail.event.TransportEvent;
-import javax.mail.event.TransportListener;
+import javax.mail.event.MessageCountEvent;
+import javax.mail.event.MessageCountListener;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * A {@link Consumer} which consumes messages from JavaMail using a {@link Transport} and
dispatches them
@@ -32,37 +38,96 @@
  *
  * @version $Revision: 523430 $
  */
-public class MailConsumer extends DefaultConsumer<MailExchange> implements TransportListener
{
+public class MailConsumer extends PollingConsumer<MailExchange> implements MessageCountListener
{
+    private static final transient Log log = LogFactory.getLog(MailConsumer.class);
     private final MailEndpoint endpoint;
-    private final Transport transport;
+    private final Folder folder;
 
-    public MailConsumer(MailEndpoint endpoint, Processor<MailExchange> processor, Transport
transport) {
-        super(endpoint, processor);
+    public MailConsumer(MailEndpoint endpoint, Processor<MailExchange> processor, Folder
folder) {
+        super(endpoint, processor, endpoint.getExecutorService());
         this.endpoint = endpoint;
-        this.transport = transport;
+        this.folder = folder;
     }
 
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        transport.addTransportListener(this);
+        ensureFolderIsOpen();
+        folder.addMessageCountListener(this);
     }
 
     @Override
     protected void doStop() throws Exception {
-        transport.close();
+        folder.removeMessageCountListener(this);
+        folder.close(true);
         super.doStop();
     }
 
-    public void messageDelivered(TransportEvent transportEvent) {
-        Message message = transportEvent.getMessage();
+    public void messagesAdded(MessageCountEvent event) {
+        Message[] messages = event.getMessages();
+        for (Message message : messages) {
+            try {
+                if (!message.getFlags().contains(Flags.Flag.DELETED)) {
+                    processMessage(message);
+
+                    flagMessageDeleted(message);
+                }
+            }
+            catch (MessagingException e) {
+                handleException(e);
+            }
+        }
+    }
+
+    public void messagesRemoved(MessageCountEvent event) {
+        Message[] messages = event.getMessages();
+        for (Message message : messages) {
+            if (log.isDebugEnabled()) {
+                try {
+                    log.debug("Removing message: " + message.getSubject());
+                }
+                catch (MessagingException e) {
+                    log.debug("Ignored: " + e);
+                }
+            }
+        }
+    }
+
+    protected void poll() throws Exception {
+        ensureFolderIsOpen();
+
+        int count = folder.getMessageCount();
+        if (count > 0) {
+            Message message = folder.getMessage(1);
+
+            processMessage(message);
+
+            flagMessageDeleted(message);
+        }
+        else if (count == -1) {
+            throw new MessagingException("Folder: " + folder.getFullName() + " is closed");
+        }
+
+        folder.close(true);
+    }
+
+    protected void processMessage(Message message) {
         MailExchange exchange = endpoint.createExchange(message);
         getProcessor().process(exchange);
     }
 
-    public void messageNotDelivered(TransportEvent transportEvent) {
+    protected void ensureFolderIsOpen() throws MessagingException {
+        if (!folder.isOpen()) {
+            folder.open(Folder.READ_WRITE);
+        }
     }
 
-    public void messagePartiallyDelivered(TransportEvent transportEvent) {
+    protected void flagMessageDeleted(Message message) throws MessagingException {
+        if (endpoint.getConfiguration().isDeleteProcessedMessages()) {
+            message.setFlag(Flags.Flag.DELETED, true);
+        }
+        else {
+            message.setFlag(Flags.Flag.SEEN, true);
+        }
     }
 }

Modified: activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java?view=diff&rev=533814&r1=533813&r2=533814
==============================================================================
--- activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
(original)
+++ activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
Mon Apr 30 10:46:11 2007
@@ -24,7 +24,7 @@
 import org.springframework.mail.javamail.JavaMailSender;
 
 import javax.mail.Message;
-import javax.mail.Transport;
+import javax.mail.Folder;
 
 /**
  * @version $Revision:520964 $
@@ -52,19 +52,28 @@
 
     public Consumer<MailExchange> createConsumer(Processor<MailExchange> processor)
throws Exception {
         JavaMailConnection connection = configuration.createJavaMailConnection(this);
-        return createConsumer(processor, connection.createTransport());
+        String protocol = getConfiguration().getProtocol();
+        if (protocol.equals("smtp")) {
+            protocol = "pop3";
+        }
+        String folderName = getConfiguration().getFolderName();
+        Folder folder = connection.getFolder(protocol, folderName);
+        if (folder == null) {
+            throw new IllegalArgumentException("No folder for protocol: " + protocol + "
and name: " + folderName);
+        }
+        return createConsumer(processor, folder);
     }
 
     /**
      * Creates a consumer using the given processor and transport
      *
      * @param processor the processor to use to process the messages
-     * @param transport the JavaMail transport to use for inbound messages
+     * @param folder the JavaMail Folder to use for inbound messages
      * @return a newly created consumer
      * @throws Exception if the consumer cannot be created
      */
-    public Consumer<MailExchange> createConsumer(Processor<MailExchange> processor,
Transport transport) throws Exception {
-        return startService(new MailConsumer(this, processor, transport));
+    public Consumer<MailExchange> createConsumer(Processor<MailExchange> processor,
Folder folder) throws Exception {
+        return startService(new MailConsumer(this, processor, folder));
     }
 
     public MailExchange createExchange() {

Modified: activemq/camel/trunk/camel-mail/src/test/java/org/apache/camel/component/mail/MailRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mail/src/test/java/org/apache/camel/component/mail/MailRouteTest.java?view=diff&rev=533814&r1=533813&r2=533814
==============================================================================
--- activemq/camel/trunk/camel-mail/src/test/java/org/apache/camel/component/mail/MailRouteTest.java
(original)
+++ activemq/camel/trunk/camel-mail/src/test/java/org/apache/camel/component/mail/MailRouteTest.java
Mon Apr 30 10:46:11 2007
@@ -20,6 +20,7 @@
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.component.mock.MockEndpoint;
 import static org.apache.camel.builder.Builder.constant;
 import org.apache.camel.builder.RouteBuilder;
 import static org.apache.camel.util.ObjectHelper.asString;
@@ -34,7 +35,12 @@
  * @version $Revision: 1.1 $
  */
 public class MailRouteTest extends ContextTestSupport {
+    private MockEndpoint resultEndpoint;
+
     public void testSendAndReceiveMails() throws Exception {
+        resultEndpoint = (MockEndpoint) resolveMandatoryEndpoint("mock:result");
+        resultEndpoint.expectedMessageCount(1);
+
         client.send("smtp://james@localhost", new Processor<Exchange>() {
             public void process(Exchange exchange) {
                 exchange.getIn().setBody("hello world!");
@@ -45,8 +51,9 @@
         assertMailboxReceivedMessages("james@localhost");
 
         // lets test the receive worked
-        // TODO 
-        // assertMailboxReceivedMessages("result@localhost");
+        resultEndpoint.assertIsSatisfied(5000);
+
+        assertMailboxReceivedMessages("copy@localhost");
     }
 
     protected void assertMailboxReceivedMessages(String name) throws IOException, MessagingException
{
@@ -62,8 +69,9 @@
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from("smtp://james@localhost").to("direct:a");
-                from("direct:a").setHeader("name", constant("James")).to("pop3:result@localhost");
+                from("smtp://james@localhost").to("queue:a");
+                from("queue:a").to("smtp://result@localhost", "smtp://copy@localhost");
+                from("smtp://result@localhost").to("mock:result");
             }
         };
     }



Mime
View raw message