Return-Path: Delivered-To: apmail-camel-commits-archive@www.apache.org Received: (qmail 90939 invoked from network); 8 Jun 2009 05:48:57 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 8 Jun 2009 05:48:57 -0000 Received: (qmail 62799 invoked by uid 500); 8 Jun 2009 05:49:09 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 62749 invoked by uid 500); 8 Jun 2009 05:49:09 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 62740 invoked by uid 99); 8 Jun 2009 05:49:09 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Jun 2009 05:49:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Jun 2009 05:49:04 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9CB7723888D1; Mon, 8 Jun 2009 05:48:43 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r782534 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/idempotent/ camel-core/src/main/java/org/apache/camel/... Date: Mon, 08 Jun 2009 05:48:42 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090608054843.9CB7723888D1@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: davsclaus Date: Mon Jun 8 05:48:40 2009 New Revision: 782534 URL: http://svn.apache.org/viewvc?rev=782534&view=rev Log: CAMEL-1650: Added confirm to idempotent repository. Also add eager option to be able to disable eager and add to the repo only when the exchange is done. Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java (contents, props changed) - copied, changed from r781916, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=782534&r1=782533&r2=782534&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Mon Jun 8 05:48:40 2009 @@ -206,13 +206,13 @@ protected boolean isValidFile(GenericFile file, boolean isDirectory) { if (!isMatched(file, isDirectory)) { if (log.isTraceEnabled()) { - log.trace("Remote file did not match. Will skip this remote file: " + file); + log.trace("File did not match. Will skip this file: " + file); } return false; } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getFileName())) { // only use the filename as the key as the file could be moved into a done folder if (log.isTraceEnabled()) { - log.trace("RemoteFileConsumer is idempotent and the file has been consumed before. Will skip this remote file: " + file); + log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: " + file); } return false; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java?rev=782534&r1=782533&r2=782534&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java Mon Jun 8 05:48:40 2009 @@ -39,6 +39,8 @@ public class IdempotentConsumerDefinition extends ExpressionNode { @XmlAttribute private String messageIdRepositoryRef; + @XmlAttribute + private Boolean eager = Boolean.TRUE; @XmlTransient private IdempotentRepository idempotentRepository; @@ -92,6 +94,19 @@ return this; } + /** + * Sets whether to eagerly add the key to the idempotent repository or wait until the exchange + * is complete. Eager is default enabled. + * + * @param eager true to add the key before processing, false to wait until + * the exchange is complete. + * @return builder + */ + public IdempotentConsumerDefinition eager(boolean eager) { + setEager(eager); + return this; + } + public String getMessageIdRepositoryRef() { return messageIdRepositoryRef; } @@ -108,12 +123,20 @@ this.idempotentRepository = idempotentRepository; } + public Boolean isEager() { + return eager; + } + + public void setEager(Boolean eager) { + this.eager = eager; + } + @Override public Processor createProcessor(RouteContext routeContext) throws Exception { Processor childProcessor = routeContext.createProcessor(this); IdempotentRepository idempotentRepository = resolveMessageIdRepository(routeContext); - return new IdempotentConsumer(getExpression().createExpression(routeContext), idempotentRepository, - childProcessor); + Expression expression = getExpression().createExpression(routeContext); + return new IdempotentConsumer(expression, idempotentRepository, eager, childProcessor); } /** Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java?rev=782534&r1=782533&r2=782534&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java Mon Jun 8 05:48:40 2009 @@ -148,6 +148,11 @@ } } + public boolean confirm(String key) { + // noop + return true; + } + public File getFileStore() { return fileStore; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=782534&r1=782533&r2=782534&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java Mon Jun 8 05:48:40 2009 @@ -31,8 +31,7 @@ /** * An implementation of the Idempotent - * Consumer pattern. + * href="http://camel.apache.org/idempotent-consumer.html">Idempotent Consumer pattern. * * @version $Revision$ */ @@ -41,17 +40,19 @@ private final Expression messageIdExpression; private final Processor processor; private final IdempotentRepository idempotentRepository; + private final boolean eager; - public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository idempotentRepository, Processor processor) { + public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository idempotentRepository, + boolean eager, Processor processor) { this.messageIdExpression = messageIdExpression; this.idempotentRepository = idempotentRepository; + this.eager = eager; this.processor = processor; } @Override public String toString() { - return "IdempotentConsumer[expression=" + messageIdExpression + ", repository=" + idempotentRepository - + ", processor=" + processor + "]"; + return "IdempotentConsumer[" + messageIdExpression + " -> " + processor + "]"; } @SuppressWarnings("unchecked") @@ -61,18 +62,26 @@ throw new NoMessageIdException(exchange, messageIdExpression); } - // add the key to the repository - boolean newKey = idempotentRepository.add(messageId); + boolean newKey; + if (eager) { + // add the key to the repository + newKey = idempotentRepository.add(messageId); + } else { + // check if we alrady have the key + newKey = !idempotentRepository.contains(messageId); + } + if (!newKey) { // we already have this key so its a duplicate message onDuplicateMessage(exchange, messageId); - } else { - // register our on completion callback - exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId)); - - // process the exchange - processor.process(exchange); + return; } + + // register our on completion callback + exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId, eager)); + + // process the exchange + processor.process(exchange); } public List next() { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java?rev=782534&r1=782533&r2=782534&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java Mon Jun 8 05:48:40 2009 @@ -31,17 +31,17 @@ * @version $Revision$ */ public class IdempotentOnCompletion implements Synchronization { - private static final transient Log LOG = LogFactory.getLog(IdempotentOnCompletion.class); - private final IdempotentRepository idempotentRepository; + private final IdempotentRepository idempotentRepository; private final String messageId; + private final boolean eager; - public IdempotentOnCompletion(IdempotentRepository idempotentRepository, String messageId) { + public IdempotentOnCompletion(IdempotentRepository idempotentRepository, String messageId, boolean eager) { this.idempotentRepository = idempotentRepository; this.messageId = messageId; + this.eager = eager; } - @SuppressWarnings("unchecked") public void onComplete(Exchange exchange) { onCompletedMessage(exchange, messageId); } @@ -57,8 +57,13 @@ * @param exchange the exchange * @param messageId the message ID of this exchange */ + @SuppressWarnings("unchecked") protected void onCompletedMessage(Exchange exchange, String messageId) { - // noop + if (!eager) { + // if not eager we should add the key when its complete + idempotentRepository.add(messageId); + } + idempotentRepository.confirm(messageId); } /** @@ -68,6 +73,7 @@ * @param exchange the exchange * @param messageId the message ID of this exchange */ + @SuppressWarnings("unchecked") protected void onFailedMessage(Exchange exchange, String messageId) { idempotentRepository.remove(messageId); if (LOG.isDebugEnabled()) { @@ -79,4 +85,5 @@ public String toString() { return "IdempotentOnCompletion[" + messageId + ']'; } + } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java?rev=782534&r1=782533&r2=782534&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java Mon Jun 8 05:48:40 2009 @@ -94,6 +94,11 @@ } } + public boolean confirm(String key) { + // noop + return true; + } + public Map getCache() { return cache; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java?rev=782534&r1=782533&r2=782534&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java Mon Jun 8 05:48:40 2009 @@ -44,9 +44,20 @@ /** * Removes the key from the repository. + *

+ * Is usually invoked if the exchange failed. * * @param key the key of the message for duplicate test * @return true if the key was removed */ boolean remove(E key); + + /** + * Confirms the key, after the exchange has been processed sucesfully. + * + * @param key the key of the message for duplicate test + * @return true if the key was confirmed + */ + boolean confirm(E key); + } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java?rev=782534&r1=782533&r2=782534&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java Mon Jun 8 05:48:40 2009 @@ -99,6 +99,10 @@ public boolean remove(String key) { return true; } + + public boolean confirm(String key) { + return true; + } } } \ No newline at end of file Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java (from r781916, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java&r1=781916&r2=782534&rev=782534&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java Mon Jun 8 05:48:40 2009 @@ -24,11 +24,12 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.processor.idempotent.MemoryIdempotentRepository; +import org.apache.camel.spi.IdempotentRepository; /** * @version $Revision$ */ -public class IdempotentConsumerTest extends ContextTestSupport { +public class IdempotentConsumerEagerTest extends ContextTestSupport { protected Endpoint startEndpoint; protected MockEndpoint resultEndpoint; @@ -43,7 +44,7 @@ public void configure() throws Exception { from("direct:start").idempotentConsumer( header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200) - ).to("mock:result"); + ).eager(false).to("mock:result"); } }); context.start(); @@ -68,7 +69,7 @@ from("direct:start").idempotentConsumer( header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200) - ).process(new Processor() { + ).eager(false).process(new Processor() { public void process(Exchange exchange) throws Exception { String id = exchange.getIn().getHeader("messageId", String.class); if (id.equals("2")) { @@ -94,6 +95,60 @@ assertMockEndpointsSatisfied(); } + public void testNotEager() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + final IdempotentRepository repo = MemoryIdempotentRepository.memoryIdempotentRepository(200); + + from("direct:start").idempotentConsumer(header("messageId"), repo).eager(false). + process(new Processor() { + public void process(Exchange exchange) throws Exception { + String id = exchange.getIn().getHeader("messageId", String.class); + // should not contain + assertFalse("Should not eager add to repo", repo.contains(id)); + } + }).to("mock:result"); + } + }); + context.start(); + + resultEndpoint.expectedBodiesReceived("one", "two", "three"); + + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("3", "three"); + + assertMockEndpointsSatisfied(); + } + + public void testEager() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + final IdempotentRepository repo = MemoryIdempotentRepository.memoryIdempotentRepository(200); + + from("direct:start").idempotentConsumer(header("messageId"), repo).eager(true). + process(new Processor() { + public void process(Exchange exchange) throws Exception { + String id = exchange.getIn().getHeader("messageId", String.class); + // should contain + assertTrue("Should eager add to repo", repo.contains(id)); + } + }).to("mock:result"); + } + }); + context.start(); + + resultEndpoint.expectedBodiesReceived("one", "two", "three"); + + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("3", "three"); + + assertMockEndpointsSatisfied(); + } + protected void sendMessage(final Object messageId, final Object body) { template.send(startEndpoint, new Processor() { public void process(Exchange exchange) { @@ -113,4 +168,4 @@ resultEndpoint = getMockEndpoint("mock:result"); } -} +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java?rev=782534&r1=782533&r2=782534&view=diff ============================================================================== --- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java (original) +++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java Mon Jun 8 05:48:40 2009 @@ -115,4 +115,9 @@ return rc.booleanValue(); } + public boolean confirm(String s) { + // noop + return true; + } + }