camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r782062 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/test/java/org/apache/camel/component/file/ components/camel-ftp/src/main/java/org/apache/camel/co...
Date Fri, 05 Jun 2009 16:51:01 GMT
Author: davsclaus
Date: Fri Jun  5 16:51:00 2009
New Revision: 782062

URL: http://svn.apache.org/viewvc?rev=782062&view=rev
Log:
CAMEL-1654: Added maxMessagesPerPoll option to Batch consumers.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMaxMessagesPerPollTest.java
  (contents, props changed)
      - copied, changed from r781916, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerMaxMessagesPerPollTest.java
      - copied, changed from r781916, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerMultipleDirectoriesTest.java
    camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailMaxMessagesPerPollTest.java
      - copied, changed from r781916, camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/BatchConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
    camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisEndpoint.java
    camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
    camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
    camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
    camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
    camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/BatchConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/BatchConsumer.java?rev=782062&r1=782061&r2=782062&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/BatchConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/BatchConsumer.java Fri Jun  5 16:51:00
2009
@@ -26,6 +26,18 @@
 public interface BatchConsumer extends Consumer {
 
     /**
+     * Sets a maximum number of messages as a limit to poll at each polling.
+     * <p/>
+     * Can be used to limit eg to 100 to avoid when starting and there are millions
+     * of messages for you in the first poll.
+     * <p/>
+     * Is default unlimited, but use 0 or negative number to disable it as unlimited.
+     *
+     * @param maxMessagesPerPoll  maximum messages to poll.
+     */
+    void setMaxMessagesPerPoll(int maxMessagesPerPoll);
+
+    /**
      * Processes the list of {@link org.apache.camel.Exchange} in a batch.
      * <p/>
      * Each message exchange will be processed individually but the batch

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java?rev=782062&r1=782061&r2=782062&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
Fri Jun  5 16:51:00 2009
@@ -69,6 +69,9 @@
             idempotentRepository = MemoryIdempotentRepository.memoryIdempotentRepository(DEFAULT_IDEMPOTENT_CACHE_SIZE);
         }
 
+        // set max messages per poll
+        result.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
+
         configureConsumer(result);
         return result;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=782062&r1=782061&r2=782062&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
Fri Jun  5 16:51:00 2009
@@ -40,6 +40,7 @@
     protected GenericFileOperations<T> operations;
     protected boolean loggedIn;
     protected String fileExpressionResult;
+    protected int maxMessagesPerPoll;
 
     public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor,
GenericFileOperations<T> operations) {
         super(endpoint, processor);
@@ -94,9 +95,20 @@
         processBatch(exchanges);
     }
 
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
+
     @SuppressWarnings("unchecked")
     public void processBatch(Queue exchanges) {
         int total = exchanges.size();
+
+        // limit if needed
+        if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
+            log.debug("Limiting to maximum messages to poll " + maxMessagesPerPoll + " as
there was " + total + " messages in this poll.");
+            total = maxMessagesPerPoll;
+        }
+
         for (int index = 0; index < total && isRunAllowed(); index++) {
             // only loop if we are started (allowed to run)
             // use poll to remove the head so it does not consume memory even after we have
processed it

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=782062&r1=782061&r2=782062&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
Fri Jun  5 16:51:00 2009
@@ -60,6 +60,7 @@
     protected boolean recursive;
     protected boolean delete;
     protected boolean flatten;
+    protected int maxMessagesPerPoll;
     protected String tempPrefix;
     protected String include;
     protected String exclude;
@@ -384,6 +385,14 @@
         this.localWorkDirectory = localWorkDirectory;
     }
 
+    public int getMaxMessagesPerPoll() {
+        return maxMessagesPerPoll;
+    }
+
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
+
     /**
      * Configures the given message with the file which sets the body to the
      * file object.

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMaxMessagesPerPollTest.java
(from r781916, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMaxMessagesPerPollTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMaxMessagesPerPollTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java&r1=781916&r2=782062&rev=782062&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMaxMessagesPerPollTest.java
Fri Jun  5 16:51:00 2009
@@ -16,49 +16,40 @@
  */
 package org.apache.camel.component.file;
 
-import java.io.File;
-
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 
 /**
- * Unit test for consuming multiple directories.
+ * Unit test for max messages per poll
  */
-public class FileConsumeMultipleDirectoriesTest extends ContextTestSupport {
+public class FileConsumeMaxMessagesPerPollTest extends ContextTestSupport {
 
-    private String fileUrl = "file://target/multidir/?recursive=true&delete=true&consumer.delay=5000&sortBy=file:path";
+    private String fileUrl = "file://target/poll/?consumer.delay=3000&sortBy=file:name&maxMessagesPerPoll=2";
 
     @Override
     protected void setUp() throws Exception {
-        deleteDirectory("target/multidir");
+        deleteDirectory("target/poll");
         super.setUp();
         template.sendBodyAndHeader(fileUrl, "Bye World", Exchange.FILE_NAME, "bye.txt");
-        template.sendBodyAndHeader(fileUrl, "Hello World", Exchange.FILE_NAME, "sub/hello.txt");
-        template.sendBodyAndHeader(fileUrl, "Godday World", Exchange.FILE_NAME, "sub/sub2/godday.txt");
+        template.sendBodyAndHeader(fileUrl, "Hello World", Exchange.FILE_NAME, "hello.txt");
+        template.sendBodyAndHeader(fileUrl, "Godday World", Exchange.FILE_NAME, "godday.txt");
     }
 
-    public void testMultiDir() throws Exception {
+    public void testMaxMessagesPerPoll() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Bye World", "Hello World", "Godday World");
+        mock.expectedBodiesReceived("Bye World", "Godday World");
+        mock.setResultWaitTime(2000);
+        mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 2);
 
         assertMockEndpointsSatisfied();
 
-        GenericFileExchange<File> exchange = (GenericFileExchange<File>) mock.getExchanges().get(0);
-        File file = exchange.getGenericFile().getFile();
-        assertDirectoryEquals("target/multidir/bye.txt", file.getPath());
-        assertEquals("bye.txt", file.getName());
-
-        exchange = (GenericFileExchange<File>) mock.getExchanges().get(1);
-        file = exchange.getGenericFile().getFile();
-        assertDirectoryEquals("target/multidir/sub/hello.txt", file.getPath());
-        assertEquals("hello.txt", file.getName());
-
-        exchange = (GenericFileExchange<File>) mock.getExchanges().get(2);
-        file = exchange.getGenericFile().getFile();
-        assertDirectoryEquals("target/multidir/sub/sub2/godday.txt", file.getPath());
-        assertEquals("godday.txt", file.getName());
+        mock.reset();
+        mock.expectedBodiesReceived("Hello World");
+        mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 1);
+
+        assertMockEndpointsSatisfied();
     }
 
     protected RouteBuilder createRouteBuilder() throws Exception {

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMaxMessagesPerPollTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMaxMessagesPerPollTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java?rev=782062&r1=782061&r2=782062&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
(original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
Fri Jun  5 16:51:00 2009
@@ -83,6 +83,9 @@
             idempotentRepository = MemoryIdempotentRepository.memoryIdempotentRepository(DEFAULT_IDEMPOTENT_CACHE_SIZE);
         }
 
+        // set max messages per poll
+        consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
+
         configureConsumer(consumer);
         return consumer;
     }

Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerMaxMessagesPerPollTest.java
(from r781916, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerMultipleDirectoriesTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerMaxMessagesPerPollTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerMaxMessagesPerPollTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerMultipleDirectoriesTest.java&r1=781916&r2=782062&rev=782062&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerMultipleDirectoriesTest.java
(original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerMaxMessagesPerPollTest.java
Fri Jun  5 16:51:00 2009
@@ -16,54 +16,45 @@
  */
 package org.apache.camel.component.file.remote;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 
 /**
  * @version $Revision$
  */
-public class FtpConsumerMultipleDirectoriesTest extends FtpServerTestSupport {
+public class FtpConsumerMaxMessagesPerPollTest extends FtpServerTestSupport {
 
     private String getFtpUrl() {
-        return "ftp://admin@localhost:" + getPort() + "/multidir/?password=admin&recursive=true&consumer.delay=5000&sortBy=file:path";
+        return "ftp://admin@localhost:" + getPort() + "/poll/?password=admin&delay=3000&delete=true&sortBy=file:name&maxMessagesPerPoll=2";
     }
 
     @Override
     protected void setUp() throws Exception {
         super.setUp();
-        deleteDirectory(FTP_ROOT_DIR + "multidir");
+        deleteDirectory(FTP_ROOT_DIR + "poll");
         prepareFtpServer();
     }
 
     private void prepareFtpServer() throws Exception {
         sendFile(getFtpUrl(), "Bye World", "bye.txt");
-        sendFile(getFtpUrl(), "Hello World", "sub/hello.txt");
-        sendFile(getFtpUrl(), "Godday World", "sub/sub2/godday.txt");
+        sendFile(getFtpUrl(), "Hello World", "hello.txt");
+        sendFile(getFtpUrl(), "Godday World", "godday.txt");
     }
 
-    public void testMultiDir() throws Exception {
+    public void testMaxMessagesPerPoll() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Bye World", "Hello World", "Godday World");
+        mock.expectedBodiesReceived("Bye World", "Godday World");
+        mock.setResultWaitTime(2000);
+        mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 2);
 
         assertMockEndpointsSatisfied();
 
-        RemoteFileExchange exchange = (RemoteFileExchange) mock.getExchanges().get(0);
-        RemoteFile file = (RemoteFile) exchange.getGenericFile();
-        assertDirectoryEquals("multidir/bye.txt", file.getAbsoluteFilePath());
-        assertDirectoryEquals("bye.txt", file.getRelativeFilePath());
-        assertEquals("bye.txt", file.getFileName());
-
-        exchange = (RemoteFileExchange) mock.getExchanges().get(1);
-        file = (RemoteFile) exchange.getGenericFile();
-        assertDirectoryEquals("multidir/sub/hello.txt", file.getAbsoluteFilePath());
-        assertDirectoryEquals("sub/hello.txt", file.getRelativeFilePath());
-        assertEquals("hello.txt", file.getFileName());
-
-        exchange = (RemoteFileExchange) mock.getExchanges().get(2);
-        file = (RemoteFile) exchange.getGenericFile();
-        assertDirectoryEquals("multidir/sub/sub2/godday.txt", file.getAbsoluteFilePath());
-        assertDirectoryEquals("sub/sub2/godday.txt", file.getRelativeFilePath());
-        assertEquals("godday.txt", file.getFileName());
+        mock.reset();
+        mock.expectedBodiesReceived("Hello World");
+        mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 1);
+
+        assertMockEndpointsSatisfied();
     }
 
     protected RouteBuilder createRouteBuilder() throws Exception {

Modified: camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisEndpoint.java?rev=782062&r1=782061&r2=782062&view=diff
==============================================================================
--- camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisEndpoint.java
(original)
+++ camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisEndpoint.java
Fri Jun  5 16:51:00 2009
@@ -37,6 +37,7 @@
     private boolean useTransactions;
     private String statement;
     private StatementType statementType;
+    private int maxMessagesPerPoll;
 
     public IBatisEndpoint() {
     }
@@ -64,6 +65,7 @@
     @Override
     public IBatisPollingConsumer createConsumer(Processor processor) throws Exception {
         IBatisPollingConsumer consumer = new IBatisPollingConsumer(this, processor);
+        consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
         configureConsumer(consumer);
         return consumer;
     }
@@ -125,4 +127,12 @@
     public void setStatementType(StatementType statementType) {
         this.statementType = statementType;
     }
+
+    public int getMaxMessagesPerPoll() {
+        return maxMessagesPerPoll;
+    }
+
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
 }

Modified: camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java?rev=782062&r1=782061&r2=782062&view=diff
==============================================================================
--- camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
(original)
+++ camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
Fri Jun  5 16:51:00 2009
@@ -120,6 +120,8 @@
      */
     private boolean routeEmptyResultSet;
 
+    private int maxMessagesPerPoll;
+
     public IBatisPollingConsumer(IBatisEndpoint endpoint, Processor processor) throws Exception
{
         super(endpoint, processor);
     }
@@ -165,10 +167,21 @@
         processBatch(answer);
     }
 
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
+
     public void processBatch(Queue exchanges) throws Exception {
         final IBatisEndpoint endpoint = getEndpoint();
 
         int total = exchanges.size();
+
+        // limit if needed
+        if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
+            LOG.debug("Limiting to maximum messages to poll " + maxMessagesPerPoll + " as
there was " + total + " messages in this poll.");
+            total = maxMessagesPerPoll;
+        }
+
         for (int index = 0; index < total && isRunAllowed(); index++) {
             // only loop if we are started (allowed to run)
             DataHolder holder = (DataHolder) exchanges.poll();

Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=782062&r1=782061&r2=782062&view=diff
==============================================================================
--- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
(original)
+++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
Fri Jun  5 16:51:00 2009
@@ -49,6 +49,7 @@
     private String query;
     private String namedQuery;
     private String nativeQuery;
+    private int maxMessagesPerPoll;
 
     private final class DataHolder {
         private Exchange exchange;
@@ -92,12 +93,23 @@
         });
     }
 
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
+
     public void processBatch(Queue exchanges) throws Exception {
         if (exchanges.isEmpty()) {
             return;
         }
 
         int total = exchanges.size();
+
+        // limit if needed
+        if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
+            LOG.debug("Limiting to maximum messages to poll " + maxMessagesPerPoll + " as
there was " + total + " messages in this poll.");
+            total = maxMessagesPerPoll;
+        }
+
         for (int index = 0; index < total && isRunAllowed(); index++) {
             // only loop if we are started (allowed to run)
             DataHolder holder = (DataHolder) exchanges.poll();

Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java?rev=782062&r1=782061&r2=782062&view=diff
==============================================================================
--- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
(original)
+++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
Fri Jun  5 16:51:00 2009
@@ -52,6 +52,7 @@
     private boolean consumeDelete = true;
     private boolean consumeLockEntity = true;
     private boolean flushOnSend = true;
+    private int maxMessagesPerPoll;
 
     public JpaEndpoint() {
     }
@@ -85,6 +86,7 @@
     public Consumer createConsumer(Processor processor) throws Exception {
         validate();
         JpaConsumer consumer = new JpaConsumer(this, processor);
+        consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
         configureConsumer(consumer);
         return consumer;
     }
@@ -213,6 +215,14 @@
         this.flushOnSend = flushOnSend;
     }
 
+    public int getMaxMessagesPerPoll() {
+        return maxMessagesPerPoll;
+    }
+
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
+
     // Implementation methods
     // -------------------------------------------------------------------------
 

Modified: camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?rev=782062&r1=782061&r2=782062&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
(original)
+++ camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
Fri Jun  5 16:51:00 2009
@@ -48,6 +48,7 @@
     private final JavaMailSenderImpl sender;
     private Folder folder;
     private Store store;
+    private int maxMessagesPerPoll;
 
     public MailConsumer(MailEndpoint endpoint, Processor processor, JavaMailSenderImpl sender)
{
         super(endpoint, processor);
@@ -127,8 +128,19 @@
         }
     }
 
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
+
     public void processBatch(Queue exchanges) throws Exception {
         int total = exchanges.size();
+
+        // limit if needed
+        if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
+            LOG.debug("Limiting to maximum messages to poll " + maxMessagesPerPoll + " as
there was " + total + " messages in this poll.");
+            total = maxMessagesPerPoll;
+        }
+
         for (int index = 0; index < total && isRunAllowed(); index++) {
             // only loop if we are started (allowed to run)
             MailExchange exchange = (MailExchange) exchanges.poll();

Modified: camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java?rev=782062&r1=782061&r2=782062&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
(original)
+++ camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
Fri Jun  5 16:51:00 2009
@@ -39,6 +39,7 @@
     private MailConfiguration configuration;
     private HeaderFilterStrategy headerFilterStrategy = new DefaultHeaderFilterStrategy();
     private ContentTypeResolver contentTypeResolver;
+    private int maxMessagesPerPoll;
 
     public MailEndpoint() {
     }
@@ -88,12 +89,13 @@
         // ScheduledPollConsumer default delay is 500 millis and that is too often for polling
a mailbox,
         // so we override with a new default value. End user can override this value by providing
a consumer.delay parameter
         answer.setDelay(MailConsumer.DEFAULT_CONSUMER_DELAY);
+
+        answer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
         configureConsumer(answer);
 
         return answer;
     }
 
-
     @Override
     public Exchange createExchange(ExchangePattern pattern) {
         return new MailExchange(this, pattern, getBinding());
@@ -147,4 +149,12 @@
     public void setContentTypeResolver(ContentTypeResolver contentTypeResolver) {
         this.contentTypeResolver = contentTypeResolver;
     }
+
+    public int getMaxMessagesPerPoll() {
+        return maxMessagesPerPoll;
+    }
+
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
 }

Copied: camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailMaxMessagesPerPollTest.java
(from r781916, camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailMaxMessagesPerPollTest.java?p2=camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailMaxMessagesPerPollTest.java&p1=camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java&r1=781916&r2=782062&rev=782062&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java
(original)
+++ camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailMaxMessagesPerPollTest.java
Fri Jun  5 16:51:00 2009
@@ -31,28 +31,26 @@
 /**
  * Unit test for batch consumer.
  */
-public class MailBatchConsumerTest extends ContextTestSupport {
+public class MailMaxMessagesPerPollTest extends ContextTestSupport {
 
     public void testBatchConsumer() throws Exception {
         prepareMailbox();
-        Mailbox mailbox = Mailbox.get("jones@localhost");
-        assertEquals(5, mailbox.size());
 
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(5);
-        mock.expectsAscending(body());
-        mock.message(0).property(Exchange.BATCH_INDEX).isEqualTo(0);
-        mock.message(1).property(Exchange.BATCH_INDEX).isEqualTo(1);
-        mock.message(2).property(Exchange.BATCH_INDEX).isEqualTo(2);
-        mock.message(3).property(Exchange.BATCH_INDEX).isEqualTo(3);
-        mock.message(4).property(Exchange.BATCH_INDEX).isEqualTo(4);
-        mock.message(0).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
-        mock.message(1).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
-        mock.message(2).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
-        mock.message(3).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
-        mock.message(3).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
-        mock.message(4).property(Exchange.BATCH_COMPLETE).isEqualTo(true);
-        mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 5);
+        mock.setResultWaitTime(2000);
+        mock.expectedMessageCount(3);
+        mock.message(0).body().isEqualTo("Message 0");
+        mock.message(1).body().isEqualTo("Message 1");
+        mock.message(2).body().isEqualTo("Message 2");
+        mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 3);
+
+        assertMockEndpointsSatisfied();
+
+        mock.reset();
+        mock.expectedMessageCount(2);
+        mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 2);
+        mock.message(0).body().isEqualTo("Message 3");
+        mock.message(1).body().isEqualTo("Message 4");
 
         assertMockEndpointsSatisfied();
     }
@@ -80,7 +78,7 @@
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                from("pop3://jones@localhost?password=secret&consumer.delay=5000"
+                from("pop3://jones@localhost?password=secret&consumer.delay=3000&maxMessagesPerPoll=3"
                     + "&delete=true").to("mock:result");
             }
         };



Mime
View raw message