Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 16495 invoked from network); 18 Apr 2007 18:51:59 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 18 Apr 2007 18:51:59 -0000 Received: (qmail 43242 invoked by uid 500); 18 Apr 2007 18:52:05 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 43224 invoked by uid 500); 18 Apr 2007 18:52:05 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 43215 invoked by uid 99); 18 Apr 2007 18:52:05 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Apr 2007 11:52:05 -0700 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Apr 2007 11:51:58 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 509911A9838; Wed, 18 Apr 2007 11:51:38 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r530128 - in /activemq/camel/trunk/camel-core/src/test/java/org/apache/camel: builder/IdempotentConsumerTest.java processor/ processor/IdempotentConsumerTest.java Date: Wed, 18 Apr 2007 18:51:38 -0000 To: commits@activemq.apache.org From: jstrachan@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070418185138.509911A9838@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jstrachan Date: Wed Apr 18 11:51:37 2007 New Revision: 530128 URL: http://svn.apache.org/viewvc?view=rev&rev=530128 Log: added a functional test case for the IdempotentConsumer Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java (contents, props changed) - copied, changed from r530102, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java Removed: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java (from r530102, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java) URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java?view=diff&rev=530128&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java&r1=530102&p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java&r2=530128 ============================================================================== --- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java (original) +++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java Wed Apr 18 11:51:37 2007 @@ -15,10 +15,103 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.builder; +package org.apache.camel.processor; + +import junit.framework.TestCase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.Processor; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.processor.idempotent.MemoryMessageIdRepository; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.util.ProducerCache; +import org.apache.camel.impl.DefaultCamelContext; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; /** * @version $Revision: 1.1 $ */ -public class IdempotentConsumerTest { +public class IdempotentConsumerTest extends TestCase { + private static final transient Log log = LogFactory.getLog(IdempotentConsumerTest.class); + + protected CamelContext container = new DefaultCamelContext(); + protected CountDownLatch latch = new CountDownLatch(3); + protected Endpoint endpoint; + protected ProducerCache client = new ProducerCache(); + protected List receivedBodies = new ArrayList(); + + public void testDuplicateMessagesAreFilteredOut() throws Exception { + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("3", "three"); + + // lets wait on the message being received + boolean received = latch.await(20, TimeUnit.SECONDS); + assertTrue("Did not receive the message!", received); + + assertEquals("Should have received 3 responses: " + receivedBodies, 3, receivedBodies.size()); + + assertEquals("received bodies", Arrays.asList(new Object[] { "one", "two", "three"}), receivedBodies); + + log.debug("Received bodies: " + receivedBodies); + } + + protected void sendMessage(final Object messageId, final Object body) { + client.send(endpoint, new Processor() { + public void process(Exchange exchange) { + // now lets fire in a message + Message in = exchange.getIn(); + in.setBody(body); + in.setHeader("messageId", messageId); + } + }); + } + + @Override + protected void setUp() throws Exception { + final Processor processor = new Processor() { + public void process(Exchange e) { + Message in = e.getIn(); + String body = in.getBody(String.class); + + log.debug("Received body: " + body + " on exchange: " + e); + + receivedBodies.add(body); + latch.countDown(); + } + }; + final String endpointUri = "queue:test.a"; + + // lets add some routes + container.addRoutes(createRouteBuilder(endpointUri, processor)); + endpoint = container.resolveEndpoint(endpointUri); + assertNotNull("No endpoint found for URI: " + endpointUri, endpoint); + + container.start(); + } + + protected RouteBuilder createRouteBuilder(final String endpointUri, final Processor processor) { + return new RouteBuilder() { + public void configure() { + from(endpointUri).idempotentConsumer(header("messageId"), MemoryMessageIdRepository.memoryMessageIdRepository()).process(processor); + } + }; + } + + @Override + protected void tearDown() throws Exception { + client.stop(); + container.stop(); + } } Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java ------------------------------------------------------------------------------ svn:eol-style = native