camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r640937 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/component/dataset/ test/java/org/apache/camel/component/dataset/
Date Tue, 25 Mar 2008 18:40:41 GMT
Author: jstrachan
Date: Tue Mar 25 11:40:39 2008
New Revision: 640937

URL: http://svn.apache.org/viewvc?rev=640937&view=rev
Log:
fixed up the DataSet so that sending the dataset happens in a separate thread (so you can
send multiple datasets concurrently) also added a minor patch to the assertions to deal with
massive data sets without terminating early (needs a better fix though!)

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java?rev=640937&r1=640936&r2=640937&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
Tue Mar 25 11:40:39 2008
@@ -19,11 +19,14 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision: 1.1 $
  */
 public class DataSetConsumer extends DefaultConsumer<Exchange> {
+    private static final transient Log LOG = LogFactory.getLog(DataSetConsumer.class);
     private DataSetEndpoint endpoint;
 
     public DataSetConsumer(DataSetEndpoint endpoint, Processor processor) {
@@ -35,15 +38,33 @@
     protected void doStart() throws Exception {
         super.doStart();
 
-        DataSet dataSet = endpoint.getDataSet();
-        for (long i = 0; i < dataSet.getSize(); i++) {
-            Exchange exchange = endpoint.createExchange(i);
-            getProcessor().process(exchange);
-
-            long delay = endpoint.getProduceDelay();
-            if (delay > 0) {
-                Thread.sleep(delay);
+        endpoint.getExecutorService().execute(new Runnable() {
+            public void run() {
+                sendMessages();
             }
+        });
+    }
+
+    protected void sendMessages() {
+        try {
+            DataSet dataSet = endpoint.getDataSet();
+            for (long i = 0; i < dataSet.getSize(); i++) {
+                Exchange exchange = endpoint.createExchange(i);
+                getProcessor().process(exchange);
+
+                long delay = endpoint.getProduceDelay();
+                if (delay > 0) {
+                    try {
+                        Thread.sleep(delay);
+                    }
+                    catch (InterruptedException e) {
+                        LOG.debug(e);
+                    }
+                }
+            }
+        }
+        catch (Exception e) {
+            LOG.error(e);
         }
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java?rev=640937&r1=640936&r2=640937&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
Tue Mar 25 11:40:39 2008
@@ -50,7 +50,7 @@
 
     public static void assertEquals(String description, Object expected, Object actual, Exchange
exchange) {
         if (!ObjectHelper.equal(expected, actual)) {
-            throw new AssertionError(description + " does not match. Expected: " + expected
+ " but was: " + actual + " on  " + exchange);
+            throw new AssertionError(description + " does not match. Expected: " + expected
+ " but was: " + actual + " on  " + exchange + " with headers: " + exchange.getIn().getHeaders());
         }
     }
 
@@ -88,6 +88,14 @@
         return exchange;
     }
 
+    @Override
+    protected void waitForCompleteLatch() throws InterruptedException {
+        // TODO lets do a much better version of this!
+        long size = getDataSet().getSize();
+        size *= 4000;
+        setDefaulResultWaitMillis(size);
+        super.waitForCompleteLatch();
+    }
 
     // Properties
     //-------------------------------------------------------------------------
@@ -163,7 +171,7 @@
 
     protected void assertMessageExpected(long index, Exchange expected, Exchange actual)
throws Exception {
         long actualCounter = ExchangeHelper.getMandatoryHeader(actual, DataSet.INDEX_HEADER,
Long.class);
-        assertEquals(DataSet.INDEX_HEADER, index, actualCounter, actual);
+        assertEquals("Header: " + DataSet.INDEX_HEADER, index, actualCounter, actual);
 
         getDataSet().assertMessageExpected(this, expected, actual, index);
     }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java?rev=640937&r1=640936&r2=640937&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java
(original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java
Tue Mar 25 11:40:39 2008
@@ -48,9 +48,9 @@
             public void configure() throws Exception {
                 from("dataset:foo").multicast().
                         to("mock:results").
-                        to("seda:foo");
+                        to("direct:foo");
 
-                from("seda:foo").to("dataset:foo");
+                from("direct:foo").to("dataset:foo");
             }
         };
     }



Mime
View raw message