camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/4] camel git commit: CAMEL-7787: Multicast - Should defer UoW done until after the aggregate has been done. Thanks to Franz Forsthofer for the patch.
Date Sun, 21 Dec 2014 15:10:59 GMT
Repository: camel
Updated Branches:
  refs/heads/camel-2.14.x 5c6720713 -> 0b53bd2f1
  refs/heads/master 15b2d886d -> 4dfdc8414


CAMEL-7787: Multicast - Should defer UoW done until after the aggregate has been done. Thanks
to Franz Forsthofer for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/76456bec
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/76456bec
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/76456bec

Branch: refs/heads/master
Commit: 76456becfc407306127062c7a3b5a08f45e6acc7
Parents: 15b2d88
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Sun Dec 21 14:18:05 2014 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Sun Dec 21 14:18:15 2014 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/camel/Exchange.java    |   1 +
 .../converter/stream/CachedOutputStream.java    |  21 +++-
 .../camel/processor/MulticastProcessor.java     |   9 ++
 .../org/apache/camel/processor/Splitter.java    |  10 ++
 .../MultiCastStreamCachingInSubRouteTest.java   | 120 ++++++++++++++++++
 .../SplitterStreamCachingInSubRouteTest.java    | 126 +++++++++++++++++++
 6 files changed, 284 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/76456bec/camel-core/src/main/java/org/apache/camel/Exchange.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/Exchange.java b/camel-core/src/main/java/org/apache/camel/Exchange.java
index 317cbfc..0d4165b 100644
--- a/camel-core/src/main/java/org/apache/camel/Exchange.java
+++ b/camel-core/src/main/java/org/apache/camel/Exchange.java
@@ -167,6 +167,7 @@ public interface Exchange {
     String OVERRULE_FILE_NAME = "CamelOverruleFileName";
 
     String PARENT_UNIT_OF_WORK = "CamelParentUnitOfWork";
+    String STREAM_CACHE_UNIT_OF_WORK = "CamelStreamCacheUnitOfWork";
     
     String RECIPIENT_LIST_ENDPOINT = "CamelRecipientListEndpoint";
     String RECEIVED_TIMESTAMP      = "CamelReceivedTimestamp";

http://git-wip-us.apache.org/repos/asf/camel/blob/76456bec/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
index 63cedc3..616b2ed 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
@@ -25,11 +25,14 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.security.GeneralSecurityException;
+
 import javax.crypto.CipherOutputStream;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
 import org.apache.camel.spi.StreamCachingStrategy;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.ObjectHelper;
@@ -78,7 +81,7 @@ public class CachedOutputStream extends OutputStream {
         currentStream = new CachedByteArrayOutputStream(strategy.getBufferSize());
         if (closedOnCompletion) {
             // add on completion so we can cleanup after the exchange is done such as deleting
temporary files
-            exchange.addOnCompletion(new SynchronizationAdapter() {
+            Synchronization onCompletion = new SynchronizationAdapter() {
                 @Override
                 public void onDone(Exchange exchange) {
                     try {
@@ -95,12 +98,24 @@ public class CachedOutputStream extends OutputStream {
                         LOG.warn("Error closing streams. This exception will be ignored.",
e);
                     }
                 }
-        
+
                 @Override
                 public String toString() {
                     return "OnCompletion[CachedOutputStream]";
                 }
-            });
+            };
+
+            UnitOfWork streamCacheUnitOfWork = exchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK,
UnitOfWork.class);
+            if (streamCacheUnitOfWork != null) {
+                // The stream cache must sometimes not be closed when the exchange is deleted.
This is for example the
+                // case in the splitter and multi-cast case with AggregationStrategy where
the result of the sub-routes
+                // are aggregated later in the main route. Here, the cached streams of the
sub-routes must be closed with
+                // the Unit of Work of the main route.
+                streamCacheUnitOfWork.addSynchronization(onCompletion);
+            } else {
+                // add on completion so we can cleanup after the exchange is done such as
deleting temporary files
+                exchange.addOnCompletion(onCompletion);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/76456bec/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 38e70bb..6c1a54c 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -864,6 +864,15 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
             // copy exchange, and do not share the unit of work
             Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
 
+            // If the multi-cast processor has an aggregation strategy
+            // then the StreamCache created by the child routes must not be 
+            // closed by the unit of work of the child route, but by the unit of 
+            // work of the parent route or grand parent route or grand grand parent route
...(in case of nesting).
+            // Set therefore the unit of work of the  parent route as stream cache unit of
work, 
+            // if it is not already set.
+            if (copy.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
+                copy.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, exchange.getUnitOfWork());
+            }
             // if we share unit of work, we need to prepare the child exchange
             if (isShareUnitOfWork()) {
                 prepareSharedUnitOfWork(copy, exchange);

http://git-wip-us.apache.org/repos/asf/camel/blob/76456bec/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
index 314de20..ec9b258 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
@@ -139,8 +139,10 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor,
Trac
         final Iterator<?> iterator;
         private final Exchange copy;
         private final RouteContext routeContext;
+        private final Exchange original;
 
         private SplitterIterable(Exchange exchange, Object value) {
+            this.original = exchange;
             this.value = value;
             this.iterator = ObjectHelper.createIterator(value);
             this.copy = copyExchangeNoAttachments(exchange, true);
@@ -177,6 +179,14 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor,
Trac
                     // create a correlated copy as the new exchange to be routed in the splitter
from the copy
                     // and do not share the unit of work
                     Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
+                    // If the splitter has an aggregation strategy
+                    // then the StreamCache created by the child routes must not be 
+                    // closed by the unit of work of the child route, but by the unit of

+                    // work of the parent route or grand parent route or grand grand parent
route... (in case of nesting).
+                    // Therefore, set the unit of work of the parent route as stream cache
unit of work, if not already set.
+                    if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null)
{
+                        newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, original.getUnitOfWork());
+                    }
                     // if we share unit of work, we need to prepare the child exchange
                     if (isShareUnitOfWork()) {
                         prepareSharedUnitOfWork(newExchange, copy);

http://git-wip-us.apache.org/repos/asf/camel/blob/76456bec/camel-core/src/test/java/org/apache/camel/processor/MultiCastStreamCachingInSubRouteTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MultiCastStreamCachingInSubRouteTest.java
b/camel-core/src/test/java/org/apache/camel/processor/MultiCastStreamCachingInSubRouteTest.java
new file mode 100644
index 0000000..aa8da23
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/MultiCastStreamCachingInSubRouteTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.stream.CachedOutputStream;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+public class MultiCastStreamCachingInSubRouteTest extends ContextTestSupport {
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.setStreamCaching(true);
+                context.getStreamCachingStrategy().setEnabled(true);
+                context.getStreamCachingStrategy().setSpoolDirectory("target/camel/cache");
+                context.getStreamCachingStrategy().setSpoolThreshold(1L);
+
+                from("direct:start").multicast(new InternalAggregationStrategy()).to("direct:a",
"direct:b").end().to("mock:result");
+
+                from("direct:startNestedMultiCast").multicast(new InternalAggregationStrategy()).to("direct:start").end()
+                        .to("mock:resultNested");
+
+                from("direct:a") //
+                        .process(new InputProcessorWithStreamCache(1)) //
+                        .to("mock:resulta");
+
+                from("direct:b") //
+                        .process(new InputProcessorWithStreamCache(2)) //
+                        .to("mock:resultb");
+            }
+        };
+    }
+
+    public void testWithAggregationStrategyAndStreamCacheInSubRoute() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Test Message 1Test Message 2");
+        template.sendBody("direct:start", "<start></start>");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testNestedMultiCastWithCachedStreamInAggregationStrategy() throws Exception
{
+        MockEndpoint mock = getMockEndpoint("mock:resultNested");
+        mock.expectedBodiesReceived("Test Message 1Test Message 2");
+        template.sendBody("direct:startNestedMultiCast", "<start></start>");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public static class InputProcessorWithStreamCache implements Processor {
+
+        private final int number;
+
+        public InputProcessorWithStreamCache(int number) {
+            this.number = number;
+        }
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+
+            CachedOutputStream cos = new CachedOutputStream(exchange);
+            String s = "Test Message " + number;
+            cos.write(s.getBytes(Charset.forName("UTF-8")));
+            cos.close();
+            InputStream is = (InputStream) cos.newStreamCache();
+            exchange.getOut().setBody(is);
+
+        }
+    }
+
+    public static class InternalAggregationStrategy implements AggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+
+            try {
+                String oldBody = oldExchange.getIn().getBody(String.class);
+                String newBody = newExchange.getIn().getBody(String.class);
+                String merged = oldBody + newBody;
+                //also do stream caching in the aggregation strategy            
+                CachedOutputStream cos = new CachedOutputStream(newExchange);
+                cos.write(merged.getBytes("UTF-8"));
+                cos.close();
+                oldExchange.getIn().setBody(cos.newStreamCache());
+                return oldExchange;
+            } catch (IOException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/76456bec/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCachingInSubRouteTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCachingInSubRouteTest.java
b/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCachingInSubRouteTest.java
new file mode 100644
index 0000000..84d7700
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCachingInSubRouteTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.stream.CachedOutputStream;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+public class SplitterStreamCachingInSubRouteTest extends ContextTestSupport {
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.setStreamCaching(true);
+                context.getStreamCachingStrategy().setEnabled(true);
+                context.getStreamCachingStrategy().setSpoolDirectory("target/camel/cache");
+                context.getStreamCachingStrategy().setSpoolThreshold(1L);
+
+                from("direct:startIterable").split(body().tokenize(",")).streaming().aggregationStrategy(new
InternalAggregationStrategy())
+                        .stopOnException().parallelProcessing().to("direct:sub").end().to("mock:result");
+
+                from("direct:start").split(body().tokenize(",")).aggregationStrategy(new
InternalAggregationStrategy()).stopOnException()
+                        .parallelProcessing().to("direct:sub").end().to("mock:result");
+
+                from("direct:sub").process(new InputProcessorWithStreamCache(22)).to("mock:resultsub");
+
+                from("direct:startNested").split(body().tokenize(",")).aggregationStrategy(new
InternalAggregationStrategy())
+                        .stopOnException().parallelProcessing().to("direct:start").end().to("mock:resultNested");
+            }
+
+        };
+    }
+
+    public void testWithAggregationStategyAndStreamCacheInSubRoute() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Test Message 22");
+        template.sendBody("direct:start", "<start></start>");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testStreamCacheIterableSplitter() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Test Message 22");
+        template.sendBody("direct:startIterable", "<start></start>");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testNested() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:resultNested");
+        mock.expectedBodiesReceived("Test Message 22");
+        template.sendBody("direct:startNested", "<start></start>");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public static class InputProcessorWithStreamCache implements Processor {
+
+        private final int number;
+
+        public InputProcessorWithStreamCache(int number) {
+            this.number = number;
+        }
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+
+            CachedOutputStream cos = new CachedOutputStream(exchange);
+            String s = "Test Message " + number;
+            cos.write(s.getBytes(Charset.forName("UTF-8")));
+            cos.close();
+            InputStream is = (InputStream) cos.newStreamCache();
+
+            exchange.getOut().setBody(is);
+        }
+    }
+
+    public static class InternalAggregationStrategy implements AggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            try {
+                String oldBody = oldExchange.getIn().getBody(String.class);
+                String newBody = newExchange.getIn().getBody(String.class);
+                String merged = oldBody + newBody;
+                //also do stream caching in the aggregation strategy            
+                CachedOutputStream cos = new CachedOutputStream(newExchange);
+                cos.write(merged.getBytes("UTF-8"));
+                cos.close();
+                oldExchange.getIn().setBody(cos.newStreamCache());
+                return oldExchange;
+            } catch (IOException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+    }
+
+}


Mime
View raw message