Return-Path: Delivered-To: apmail-activemq-camel-commits-archive@locus.apache.org Received: (qmail 2004 invoked from network); 4 Apr 2008 12:36:51 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 4 Apr 2008 12:36:51 -0000 Received: (qmail 31161 invoked by uid 500); 4 Apr 2008 12:36:51 -0000 Delivered-To: apmail-activemq-camel-commits-archive@activemq.apache.org Received: (qmail 31141 invoked by uid 500); 4 Apr 2008 12:36:51 -0000 Mailing-List: contact camel-commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: camel-dev@activemq.apache.org Delivered-To: mailing list camel-commits@activemq.apache.org Received: (qmail 31132 invoked by uid 99); 4 Apr 2008 12:36:51 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Apr 2008 05:36:51 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Apr 2008 12:36:18 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B26951A9838; Fri, 4 Apr 2008 05:36:30 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r644696 - in /activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset: DataSetConsumer.java DataSetEndpoint.java Date: Fri, 04 Apr 2008 12:36:30 -0000 To: camel-commits@activemq.apache.org From: jstrachan@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080404123630.B26951A9838@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jstrachan Date: Fri Apr 4 05:36:27 2008 New Revision: 644696 URL: http://svn.apache.org/viewvc?rev=644696&view=rev Log: by default lets use a reporter to dump the progress of sending messages Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java?rev=644696&r1=644695&r2=644696&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java Fri Apr 4 05:36:27 2008 @@ -18,6 +18,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.processor.ThroughputLogger; import org.apache.camel.impl.DefaultConsumer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,6 +29,7 @@ public class DataSetConsumer extends DefaultConsumer { private static final transient Log LOG = LogFactory.getLog(DataSetConsumer.class); private DataSetEndpoint endpoint; + private Processor reporter; public DataSetConsumer(DataSetEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -38,17 +40,24 @@ protected void doStart() throws Exception { super.doStart(); + if (reporter == null) { + reporter = createReporter(); + } + final DataSet dataSet = endpoint.getDataSet(); + final long preloadSize = endpoint.getPreloadSize(); + + sendMessages(0, preloadSize); + endpoint.getExecutorService().execute(new Runnable() { public void run() { - sendMessages(); + sendMessages(preloadSize, dataSet.getSize()); } }); } - protected void sendMessages() { + protected void sendMessages(long startIndex, long endIndex) { try { - DataSet dataSet = endpoint.getDataSet(); - for (long i = 0; i < dataSet.getSize(); i++) { + for (long i = startIndex; i < endIndex; i++) { Exchange exchange = endpoint.createExchange(i); getProcessor().process(exchange); @@ -60,9 +69,18 @@ LOG.debug(e); } } + if (reporter != null) { + reporter.process(exchange); + } } } catch (Exception e) { LOG.error(e); } + } + + protected ThroughputLogger createReporter() { + ThroughputLogger answer = new ThroughputLogger(endpoint.getEndpointUri(), (int) endpoint.getDataSet().getReportCount()); + answer.setAction("Sent"); + return answer; } } Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java?rev=644696&r1=644695&r2=644696&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java Fri Apr 4 05:36:27 2008 @@ -42,6 +42,7 @@ private long produceDelay = -1; private long consumeDelay = -1; private long startTime; + private long preloadSize = 0; public DataSetEndpoint(String endpointUri, Component component, DataSet dataSet) { super(endpointUri, component); @@ -106,6 +107,17 @@ public void setDataSet(DataSet dataSet) { this.dataSet = dataSet; + } + + public long getPreloadSize() { + return preloadSize; + } + + /** + * Sets how many messages should be preloaded (sent) before the route completes its initialisation + */ + public void setPreloadSize(long preloadSize) { + this.preloadSize = preloadSize; } public long getConsumeDelay() {