From camel-commits-return-1608-apmail-activemq-camel-commits-archive=activemq.apache.org@activemq.apache.org Tue Mar 04 18:58:37 2008 Return-Path: Delivered-To: apmail-activemq-camel-commits-archive@locus.apache.org Received: (qmail 89549 invoked from network); 4 Mar 2008 18:58:37 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 4 Mar 2008 18:58:37 -0000 Received: (qmail 50169 invoked by uid 500); 4 Mar 2008 18:58:33 -0000 Delivered-To: apmail-activemq-camel-commits-archive@activemq.apache.org Received: (qmail 50155 invoked by uid 500); 4 Mar 2008 18:58:33 -0000 Mailing-List: contact camel-commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: camel-dev@activemq.apache.org Delivered-To: mailing list camel-commits@activemq.apache.org Received: (qmail 50146 invoked by uid 99); 4 Mar 2008 18:58:33 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Mar 2008 10:58:33 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Mar 2008 18:58:06 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B89EA1A9832; Tue, 4 Mar 2008 10:58:15 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r633584 - in /activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset: DataSet.java DataSetConsumer.java DataSetEndpoint.java DataSetSupport.java Date: Tue, 04 Mar 2008 18:58:14 -0000 To: camel-commits@activemq.apache.org From: jstrachan@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080304185815.B89EA1A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jstrachan Date: Tue Mar 4 10:58:11 2008 New Revision: 633584 URL: http://svn.apache.org/viewvc?rev=633584&view=rev Log: improvements to the DataSet support to allow custom delays to be used when sending or consuming messages; also added a report group so that for massive tests we can report every N messages so we can see the progress Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSet.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetSupport.java Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSet.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSet.java?rev=633584&r1=633583&r2=633584&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSet.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSet.java Tue Mar 4 10:58:11 2008 @@ -43,4 +43,9 @@ * Asserts that the expected message has been received for the given index */ void assertMessageExpected(DataSetEndpoint dataSetEndpoint, Exchange expected, Exchange actual, long index) throws Exception; + + /** + * Returns the number of messages which should be received before reporting on the progress of the test + */ + long getReportCount(); } Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java?rev=633584&r1=633583&r2=633584&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java Tue Mar 4 10:58:11 2008 @@ -41,6 +41,11 @@ for (long i = 0; i < dataSet.getSize(); i++) { Exchange exchange = endpoint.createExchange(i); getProcessor().process(exchange); + + long delay = endpoint.getProduceDelay(); + if (delay > 0) { + Thread.sleep(delay); + } } } } Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java?rev=633584&r1=633583&r2=633584&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java Tue Mar 4 10:58:11 2008 @@ -43,6 +43,9 @@ private static final transient Log LOG = LogFactory.getLog(DataSetEndpoint.class); private DataSet dataSet; private AtomicInteger receivedCounter = new AtomicInteger(); + private long produceDelay = -1; + private long consumeDelay = -1; + private long startTime; public static void assertEquals(String description, Object expected, Object actual, Exchange exchange) { if (!ObjectHelper.equal(expected, actual)) { @@ -90,6 +93,10 @@ } + // Properties + //------------------------------------------------------------------------- + + public DataSet getDataSet() { return dataSet; } @@ -98,8 +105,39 @@ this.dataSet = dataSet; } + public long getConsumeDelay() { + return consumeDelay; + } + + /** + * Allows a delay to be specified which causes consumers to pause - to simulate slow consumers + */ + public void setConsumeDelay(long consumeDelay) { + this.consumeDelay = consumeDelay; + } + + public long getProduceDelay() { + return produceDelay; + } + + /** + * Allows a delay to be specified which causes producers to pause - to simpulate slow producers + */ + public void setProduceDelay(long produceDelay) { + this.produceDelay = produceDelay; + } + + + // Implementation methods + //------------------------------------------------------------------------- + + + @Override protected void performAssertions(Exchange actual) throws Exception { + if (startTime == 0) { + startTime = System.currentTimeMillis(); + } int receivedCount = receivedCounter.incrementAndGet(); long index = receivedCount - 1; Exchange expected = createExchange(index); @@ -108,6 +146,23 @@ LOG.debug("Received message: " + index + " = " + actual); assertMessageExpected(index, expected, actual); + + if (consumeDelay > 0) { + Thread.sleep(consumeDelay); + } + + long group = getDataSet().getReportCount(); + if (receivedCount % group == 0) { + reportProgress(actual, receivedCount); + } + } + + protected void reportProgress(Exchange actual, int receivedCount) { + long time = System.currentTimeMillis(); + long elapsed = time - startTime; + startTime = time; + + LOG.info("Received: " + receivedCount + " messages so far. Last group took: " + elapsed + " millis"); } protected void assertMessageExpected(long index, Exchange expected, Exchange actual) throws Exception { Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetSupport.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetSupport.java?rev=633584&r1=633583&r2=633584&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetSupport.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetSupport.java Tue Mar 4 10:58:11 2008 @@ -32,6 +32,7 @@ private Map defaultHeaders; private Processor outputTransformer; private long size = 10; + private long reportCount = -1; public DataSetSupport() { } @@ -61,6 +62,9 @@ DataSetEndpoint.assertEquals("message body", expectedBody, actualBody, actual); } + // Properties + //------------------------------------------------------------------------- + public long getSize() { return size; } @@ -69,6 +73,20 @@ this.size = size; } + public long getReportCount() { + if (reportCount <= 0) { + reportCount = getSize() / 5; + } + return reportCount; + } + + /** + * Sets the number of messages in a group on which we will report that messages have been received. + */ + public void setReportCount(long reportCount) { + this.reportCount = reportCount; + } + public Map getDefaultHeaders() { if (defaultHeaders == null) { defaultHeaders = new HashMap(); @@ -88,6 +106,9 @@ public void setOutputTransformer(Processor outputTransformer) { this.outputTransformer = outputTransformer; } + + // Implementation methods + //------------------------------------------------------------------------- protected abstract Object createMessageBody(long messageIndex);