camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [2/2] git commit: CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to configure and allow 3rd party to plugin custom strategies. Work in progress.
Date Mon, 22 Jul 2013 13:56:08 GMT
CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to configure and allow
3rd party to plugin custom strategies. Work in progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5369bf1c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5369bf1c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5369bf1c

Branch: refs/heads/master
Commit: 5369bf1ca86b74fd8586c60f61d6a049bbe797d7
Parents: b11f4e7
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Mon Jul 22 15:55:57 2013 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Mon Jul 22 15:55:57 2013 +0200

----------------------------------------------------------------------
 .../ManagedStreamCachingStrategyMBean.java      |  14 +-
 .../impl/DefaultStreamCachingStrategy.java      | 133 +++++++++----------
 .../mbean/ManagedStreamCachingStrategy.java     |  16 ++-
 .../apache/camel/spi/StreamCachingStrategy.java |  24 ++--
 .../StreamCachingCustomShouldSpoolRuleTest.java | 118 ++++++++++++++++
 .../StreamCachingCustomShouldSpoolTaskTest.java | 118 ----------------
 .../xml/AbstractCamelContextFactoryBean.java    |  20 +++
 .../CamelStreamCachingStrategyDefinition.java   |  43 +++++-
 8 files changed, 272 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5369bf1c/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
index ea49ec5..342dc7f 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
@@ -36,6 +36,12 @@ public interface ManagedStreamCachingStrategyMBean {
     @ManagedAttribute(description = "Threshold in bytes when overflow and spooling to disk
instead of keeping in memory")
     long getSpoolThreshold();
 
+    @ManagedAttribute(description = "Percentage (1-99) of used heap memory threshold to activate
spooling to disk")
+    void setSpoolUsedHeapMemoryThreshold(int percentage);
+
+    @ManagedAttribute(description = "Percentage (1-99) of used heap memory threshold to activate
spooling to disk")
+    long getSpoolUsedHeapMemoryThreshold();
+
     @ManagedAttribute(description = "Buffer size in bytes to use when coping between buffers")
     void setBufferSize(int bufferSize);
 
@@ -48,11 +54,11 @@ public interface ManagedStreamCachingStrategyMBean {
     @ManagedAttribute(description = "Whether to remove spool directory when stopping")
     boolean isRemoveSpoolDirectoryWhenStopping();
 
-    @ManagedAttribute(description = "Whether any or all should spool tasks determines should
spool")
-    void setAnySpoolTasks(boolean any);
+    @ManagedAttribute(description = "Whether any or all spool rules determines whether to
spool")
+    void setAnySpoolRules(boolean any);
 
-    @ManagedAttribute(description = "Whether any or all should spool tasks determines should
spool")
-    boolean isAnySpoolTasks();
+    @ManagedAttribute(description = "Whether any or all spool rules determines whether to
spool")
+    boolean isAnySpoolRules();
 
     @ManagedAttribute(description = "Number of in-memory StreamCache created")
     long getCacheMemoryCounter();

http://git-wip-us.apache.org/repos/asf/camel/blob/5369bf1c/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
index ddacc5d..347e8ac 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
@@ -39,8 +39,6 @@ import org.slf4j.LoggerFactory;
  */
 public class DefaultStreamCachingStrategy extends org.apache.camel.support.ServiceSupport
implements CamelContextAware, StreamCachingStrategy {
 
-    // TODO: add easy configuration in XML to add custom should spool tasks
-
     @Deprecated
     public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
     @Deprecated
@@ -57,13 +55,13 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
     private File spoolDirectory;
     private transient String spoolDirectoryName = "${java.io.tmpdir}camel-tmp-#uuid#";
     private long spoolThreshold = StreamCache.DEFAULT_SPOOL_THRESHOLD;
-    private int spoolHeapMemoryWatermarkThreshold;
+    private int spoolUsedHeapMemoryThreshold;
     private String spoolChiper;
     private int bufferSize = IOHelper.DEFAULT_BUFFER_SIZE;
     private boolean removeSpoolDirectoryWhenStopping = true;
     private final UtilizationStatistics statistics = new UtilizationStatistics();
-    private final Set<ShouldSpoolTask> spoolTasks = new LinkedHashSet<ShouldSpoolTask>();
-    private boolean anySpoolTasks;
+    private final Set<SpoolRule> spoolRules = new LinkedHashSet<SpoolRule>();
+    private boolean anySpoolRules;
 
     public CamelContext getCamelContext() {
         return camelContext;
@@ -97,12 +95,12 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
         return spoolThreshold;
     }
 
-    public int getSpoolHeapMemoryWatermarkThreshold() {
-        return spoolHeapMemoryWatermarkThreshold;
+    public int getSpoolUsedHeapMemoryThreshold() {
+        return spoolUsedHeapMemoryThreshold;
     }
 
-    public void setSpoolHeapMemoryWatermarkThreshold(int spoolHeapMemoryWatermarkThreshold)
{
-        this.spoolHeapMemoryWatermarkThreshold = spoolHeapMemoryWatermarkThreshold;
+    public void setSpoolUsedHeapMemoryThreshold(int spoolHeapMemoryWatermarkThreshold) {
+        this.spoolUsedHeapMemoryThreshold = spoolHeapMemoryWatermarkThreshold;
     }
 
     public void setSpoolThreshold(long spoolThreshold) {
@@ -133,12 +131,12 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
         this.removeSpoolDirectoryWhenStopping = removeSpoolDirectoryWhenStopping;
     }
 
-    public boolean isAnySpoolTasks() {
-        return anySpoolTasks;
+    public boolean isAnySpoolRules() {
+        return anySpoolRules;
     }
 
-    public void setAnySpoolTasks(boolean anySpoolTasks) {
-        this.anySpoolTasks = anySpoolTasks;
+    public void setAnySpoolRules(boolean anySpoolTasks) {
+        this.anySpoolRules = anySpoolTasks;
     }
 
     public Statistics getStatistics() {
@@ -146,29 +144,29 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
     }
 
     public boolean shouldSpoolCache(long length) {
-        if (spoolTasks.isEmpty()) {
+        if (spoolRules.isEmpty()) {
             return false;
         }
 
         boolean all = true;
         boolean any = false;
-        for (ShouldSpoolTask task : spoolTasks) {
-            boolean result = task.shouldSpoolCache(length);
+        for (SpoolRule rule : spoolRules) {
+            boolean result = rule.shouldSpoolCache(length);
             if (!result) {
                 all = false;
             } else {
                 any = true;
-                if (anySpoolTasks) {
+                if (anySpoolRules) {
                     // no need to check anymore
                     break;
                 }
             }
         }
-        return anySpoolTasks ? any : all;
+        return anySpoolRules ? any : all;
     }
 
-    public void addShouldSpoolTask(ShouldSpoolTask task) {
-        spoolTasks.add(task);
+    public void addSpoolRule(SpoolRule rule) {
+        spoolRules.add(rule);
     }
 
     public StreamCache cache(Exchange exchange) {
@@ -192,6 +190,26 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
         return cache;
     }
 
+    protected String resolveSpoolDirectory(String path) {
+        String name = camelContext.getManagementNameStrategy().resolveManagementName(path,
camelContext.getName(), false);
+        if (name != null) {
+            name = customResolveManagementName(name);
+        }
+        // and then check again with invalid check to ensure all ## is resolved
+        if (name != null) {
+            name = camelContext.getManagementNameStrategy().resolveManagementName(name, camelContext.getName(),
true);
+        }
+        return name;
+    }
+
+    protected String customResolveManagementName(String pattern) {
+        if (pattern.contains("#uuid#")) {
+            String uuid = UUID.randomUUID().toString();
+            pattern = pattern.replaceFirst("#uuid#", uuid);
+        }
+        return FilePathResolver.resolvePath(pattern);
+    }
+
     @Override
     protected void doStart() throws Exception {
         if (!enabled) {
@@ -225,12 +243,12 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
             LOG.warn("Configuring of StreamCaching using CamelContext properties is deprecated
- use StreamCachingStrategy instead.");
         }
 
-        if (spoolHeapMemoryWatermarkThreshold < 0 || spoolHeapMemoryWatermarkThreshold
> 100) {
-            throw new IllegalArgumentException("SpoolHeapMemoryWatermarkThreshold must be
a value between 0 and 100, was: " + spoolHeapMemoryWatermarkThreshold);
+        if (spoolUsedHeapMemoryThreshold > 99) {
+            throw new IllegalArgumentException("SpoolHeapMemoryWatermarkThreshold must not
be higher than 99, was: " + spoolUsedHeapMemoryThreshold);
         }
 
         // if we can overflow to disk then make sure directory exists / is created
-        if (spoolThreshold > 0 || spoolHeapMemoryWatermarkThreshold > 0) {
+        if (spoolThreshold > 0 || spoolUsedHeapMemoryThreshold > 0) {
 
             if (spoolDirectory == null && spoolDirectoryName == null) {
                 throw new IllegalArgumentException("SpoolDirectory must be configured when
using SpoolThreshold > 0");
@@ -263,40 +281,20 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
             }
 
             if (spoolThreshold > 0) {
-                spoolTasks.add(new FixedThresholdShouldSpoolTask(spoolThreshold));
+                spoolRules.add(new FixedThresholdSpoolRule());
             }
-            if (spoolHeapMemoryWatermarkThreshold > 0) {
-                spoolTasks.add(new UsedHeapMemoryShouldSpoolTask(spoolHeapMemoryWatermarkThreshold));
+            if (spoolUsedHeapMemoryThreshold > 0) {
+                spoolRules.add(new UsedHeapMemorySpoolRule());
             }
         }
 
         LOG.debug("StreamCaching configuration {}", this.toString());
 
         if (spoolDirectory != null) {
-            LOG.info("StreamCaching in use with spool directory: {} and thresholds: {}",
spoolDirectory.getPath(), spoolTasks.toString());
+            LOG.info("StreamCaching in use with spool directory: {} and rules: {}", spoolDirectory.getPath(),
spoolRules.toString());
         } else {
-            LOG.info("StreamCaching in use with thresholds: {}", spoolTasks.toString());
-        }
-    }
-
-    protected String resolveSpoolDirectory(String path) {
-        String name = camelContext.getManagementNameStrategy().resolveManagementName(path,
camelContext.getName(), false);
-        if (name != null) {
-            name = customResolveManagementName(name);
-        }
-        // and then check again with invalid check to ensure all ## is resolved
-        if (name != null) {
-            name = camelContext.getManagementNameStrategy().resolveManagementName(name, camelContext.getName(),
true);
+            LOG.info("StreamCaching in use with rules: {}", spoolRules.toString());
         }
-        return name;
-    }
-
-    protected String customResolveManagementName(String pattern) {
-        if (pattern.contains("#uuid#")) {
-            String uuid = UUID.randomUUID().toString();
-            pattern = pattern.replaceFirst("#uuid#", uuid);
-        }
-        return FilePathResolver.resolvePath(pattern);
     }
 
     @Override
@@ -317,55 +315,48 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
     public String toString() {
         return "DefaultStreamCachingStrategy["
             + "spoolDirectory=" + spoolDirectory
-            + ", spoolThreshold=" + spoolThreshold
             + ", spoolChiper=" + spoolChiper
+            + ", spoolThreshold=" + spoolThreshold
+            + ", spoolUsedHeapMemoryThreshold=" + spoolUsedHeapMemoryThreshold
             + ", bufferSize=" + bufferSize
-            + ", anySpoolTasks=" + anySpoolTasks + "]";
+            + ", anySpoolRules=" + anySpoolRules + "]";
     }
 
-    private static final class FixedThresholdShouldSpoolTask implements ShouldSpoolTask {
-
-        private final long threshold;
-
-        private FixedThresholdShouldSpoolTask(long threshold) {
-            this.threshold = threshold;
-        }
+    private final class FixedThresholdSpoolRule implements SpoolRule {
 
         public boolean shouldSpoolCache(long length) {
-            if (threshold > 0 && length > threshold) {
-                LOG.trace("Should spool cache {} > {} -> true", length, threshold);
+            if (spoolThreshold > 0 && length > spoolThreshold) {
+                LOG.trace("Should spool cache {} > {} -> true", length, spoolThreshold);
                 return true;
             }
             return false;
         }
 
         public String toString() {
-            if (threshold < 1024) {
-                return "Spool > " + threshold + " bytes body size";
+            if (spoolThreshold < 1024) {
+                return "Spool > " + spoolThreshold + " bytes body size";
             } else {
-                return "Spool > " + (threshold >> 10) + "K body size";
+                return "Spool > " + (spoolThreshold >> 10) + "K body size";
             }
         }
     }
 
-    private final class UsedHeapMemoryShouldSpoolTask implements ShouldSpoolTask {
+    private final class UsedHeapMemorySpoolRule implements SpoolRule {
 
         private final MemoryMXBean heapUsage;
-        private final int spoolPercentage;
 
-        private UsedHeapMemoryShouldSpoolTask(int spoolPercentage) {
-            this.spoolPercentage = spoolPercentage;
+        private UsedHeapMemorySpoolRule() {
             this.heapUsage = ManagementFactory.getMemoryMXBean();
         }
 
         public boolean shouldSpoolCache(long length) {
-            if (spoolPercentage > 0) {
+            if (spoolUsedHeapMemoryThreshold > 0) {
                 long used = heapUsage.getHeapMemoryUsage().getUsed();
                 long committed = heapUsage.getHeapMemoryUsage().getCommitted();
                 long percentage = committed / used * 100;
                 LOG.trace("Heap memory: [used=%sK (%sK\\%), committed=%sK]", new Object[]{used
>> 10, percentage, committed >> 10});
-                if (percentage >= spoolPercentage) {
-                    LOG.trace("Should spool cache {} > {} -> true", percentage, spoolPercentage);
+                if (percentage >= spoolUsedHeapMemoryThreshold) {
+                    LOG.trace("Should spool cache {} > {} -> true", percentage, spoolUsedHeapMemoryThreshold);
                     return true;
                 }
             }
@@ -373,14 +364,14 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
         }
 
         public String toString() {
-            return "Spool > " + spoolPercentage + "% used heap memory";
+            return "Spool > " + spoolUsedHeapMemoryThreshold + "% used heap memory";
         }
     }
 
     /**
      * Represents utilization statistics.
      */
-    private final class UtilizationStatistics implements Statistics {
+    private static final class UtilizationStatistics implements Statistics {
 
         private boolean statisticsEnabled;
         private volatile long memoryCounter;

http://git-wip-us.apache.org/repos/asf/camel/blob/5369bf1c/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
index e831d1f..a76e5dc 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
@@ -66,6 +66,14 @@ public class ManagedStreamCachingStrategy extends ManagedService implements
Mana
         return streamCachingStrategy.getSpoolThreshold();
     }
 
+    public void setSpoolUsedHeapMemoryThreshold(int percentage) {
+        streamCachingStrategy.setSpoolUsedHeapMemoryThreshold(percentage);
+    }
+
+    public long getSpoolUsedHeapMemoryThreshold() {
+        return streamCachingStrategy.getSpoolUsedHeapMemoryThreshold();
+    }
+
     public void setBufferSize(int bufferSize) {
         streamCachingStrategy.setBufferSize(bufferSize);
     }
@@ -82,12 +90,12 @@ public class ManagedStreamCachingStrategy extends ManagedService implements
Mana
         return streamCachingStrategy.isRemoveSpoolDirectoryWhenStopping();
     }
 
-    public void setAnySpoolTasks(boolean any) {
-        streamCachingStrategy.setAnySpoolTasks(any);
+    public void setAnySpoolRules(boolean any) {
+        streamCachingStrategy.setAnySpoolRules(any);
     }
 
-    public boolean isAnySpoolTasks() {
-        return streamCachingStrategy.isAnySpoolTasks();
+    public boolean isAnySpoolRules() {
+        return streamCachingStrategy.isAnySpoolRules();
     }
 
     public long getCacheMemoryCounter() {

http://git-wip-us.apache.org/repos/asf/camel/blob/5369bf1c/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
index c44eb4d..089ad09 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
@@ -81,9 +81,9 @@ public interface StreamCachingStrategy extends Service {
     }
 
     /**
-     * Task for determine if stream caching should be spooled to disk or kept in-memory.
+     * Rule for determine if stream caching should be spooled to disk or kept in-memory.
      */
-    interface ShouldSpoolTask {
+    interface SpoolRule {
 
         /**
          * Determines if the stream should be spooled or not. For example if the stream length
is
@@ -130,13 +130,13 @@ public interface StreamCachingStrategy extends Service {
     long getSpoolThreshold();
 
     /**
-     * Sets a percentage (0-100) of used heap memory threshold to activate spooling to disk.
+     * Sets a percentage (1-99) of used heap memory threshold to activate spooling to disk.
      *
      * @param percentage percentage of used heap memory.
      */
-    void setSpoolHeapMemoryWatermarkThreshold(int percentage);
+    void setSpoolUsedHeapMemoryThreshold(int percentage);
 
-    int getSpoolHeapMemoryWatermarkThreshold();
+    int getSpoolUsedHeapMemoryThreshold();
 
     /**
      * Sets the buffer size to use when allocating in-memory buffers used for in-memory stream
caches.
@@ -166,16 +166,16 @@ public interface StreamCachingStrategy extends Service {
     boolean isRemoveSpoolDirectoryWhenStopping();
 
     /**
-     * Sets whether if just any of the {@link ShouldSpoolTask}
+     * Sets whether if just any of the {@link org.apache.camel.spi.StreamCachingStrategy.SpoolRule}
rules
      * returns <tt>true</tt> then {@link #shouldSpoolCache(long)} returns <tt>true</tt>.
-     * If this option is <tt>false</tt>, then <b>all</b> the {@link
ShouldSpoolTask} must
+     * If this option is <tt>false</tt>, then <b>all</b> the {@link
org.apache.camel.spi.StreamCachingStrategy.SpoolRule} must
      * return <tt>true</tt>.
      * <p/>
-     * The default value is <tt>false</tt>
+     * The default value is <tt>false</tt> which means that all the rules must
return <tt>true</tt>.
      */
-    void setAnySpoolTasks(boolean any);
+    void setAnySpoolRules(boolean any);
 
-    boolean isAnySpoolTasks();
+    boolean isAnySpoolRules();
 
     /**
      * Gets the utilization statistics.
@@ -183,9 +183,9 @@ public interface StreamCachingStrategy extends Service {
     Statistics getStatistics();
 
     /**
-     * Adds the {@link ShouldSpoolTask} to be used.
+     * Adds the {@link org.apache.camel.spi.StreamCachingStrategy.SpoolRule} rule to be used.
      */
-    void addShouldSpoolTask(ShouldSpoolTask task);
+    void addSpoolRule(SpoolRule rule);
 
     /**
      * Determines if the stream should be spooled or not. For example if the stream length
is

http://git-wip-us.apache.org/repos/asf/camel/blob/5369bf1c/camel-core/src/test/java/org/apache/camel/processor/StreamCachingCustomShouldSpoolRuleTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/StreamCachingCustomShouldSpoolRuleTest.java
b/camel-core/src/test/java/org/apache/camel/processor/StreamCachingCustomShouldSpoolRuleTest.java
new file mode 100644
index 0000000..7375e74
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/StreamCachingCustomShouldSpoolRuleTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FilterInputStream;
+import java.io.InputStream;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.StreamCachingStrategy;
+
+public class StreamCachingCustomShouldSpoolRuleTest extends ContextTestSupport {
+
+    private MyCustomSpoolRule spoolRule = new MyCustomSpoolRule();
+
+    @Override
+    protected void setUp() throws Exception {
+        deleteDirectory("target/cachedir");
+        super.setUp();
+    }
+
+    public void testByteArrayInputStream() throws Exception {
+        getMockEndpoint("mock:english").expectedBodiesReceived("<hello/>");
+        getMockEndpoint("mock:dutch").expectedBodiesReceived("<hallo/>");
+        getMockEndpoint("mock:german").expectedBodiesReceived("<hallo/>");
+        getMockEndpoint("mock:french").expectedBodiesReceived("<hellos/>");
+
+        // need to wrap in MyInputStream as ByteArrayInputStream is optimized to just reuse
in memory buffer
+        // and not needed to spool to disk
+        template.sendBody("direct:a", new MyInputStream(new ByteArrayInputStream("<hello/>".getBytes())));
+
+        spoolRule.setSpool(true);
+        template.sendBody("direct:a", new MyInputStream(new ByteArrayInputStream("<hallo/>".getBytes())));
+        template.sendBody("direct:a", new MyInputStream(new ByteArrayInputStream("<hellos/>".getBytes())));
+
+        assertMockEndpointsSatisfied();
+    }
+
+    private class MyInputStream extends FilterInputStream {
+
+        private MyInputStream(InputStream in) {
+            super(in);
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.getStreamCachingStrategy().setSpoolDirectory("target/cachedir");
+                context.getStreamCachingStrategy().addSpoolRule(spoolRule);
+                context.getStreamCachingStrategy().setAnySpoolRules(true);
+                context.setStreamCaching(true);
+
+                from("direct:a")
+                    .choice()
+                        .when(xpath("//hello")).to("mock:english")
+                        .when(xpath("//hallo")).to("mock:dutch", "mock:german")
+                        .otherwise().to("mock:french")
+                    .end()
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // check if spool file exists
+                            if (spoolRule.isSpool()) {
+                                String[] names = new File("target/cachedir").list();
+                                assertEquals("There should be a cached spool file", 1, names.length);
+                            }
+                        }
+                    });
+
+            }
+        };
+    }
+
+    private static final class MyCustomSpoolRule implements StreamCachingStrategy.SpoolRule
{
+
+        private volatile boolean spool;
+
+        @Override
+        public boolean shouldSpoolCache(long length) {
+            return spool;
+        }
+
+        public boolean isSpool() {
+            return spool;
+        }
+
+        public void setSpool(boolean spool) {
+            this.spool = spool;
+        }
+
+        @Override
+        public String toString() {
+            return "MyCustomSpoolRule";
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/5369bf1c/camel-core/src/test/java/org/apache/camel/processor/StreamCachingCustomShouldSpoolTaskTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/StreamCachingCustomShouldSpoolTaskTest.java
b/camel-core/src/test/java/org/apache/camel/processor/StreamCachingCustomShouldSpoolTaskTest.java
deleted file mode 100644
index 3a96ea7..0000000
--- a/camel-core/src/test/java/org/apache/camel/processor/StreamCachingCustomShouldSpoolTaskTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FilterInputStream;
-import java.io.InputStream;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.spi.StreamCachingStrategy;
-
-public class StreamCachingCustomShouldSpoolTaskTest extends ContextTestSupport {
-
-    private MyCustomShouldSpoolTask spoolTask = new MyCustomShouldSpoolTask();
-
-    @Override
-    protected void setUp() throws Exception {
-        deleteDirectory("target/cachedir");
-        super.setUp();
-    }
-
-    public void testByteArrayInputStream() throws Exception {
-        getMockEndpoint("mock:english").expectedBodiesReceived("<hello/>");
-        getMockEndpoint("mock:dutch").expectedBodiesReceived("<hallo/>");
-        getMockEndpoint("mock:german").expectedBodiesReceived("<hallo/>");
-        getMockEndpoint("mock:french").expectedBodiesReceived("<hellos/>");
-
-        // need to wrap in MyInputStream as ByteArrayInputStream is optimized to just reuse
in memory buffer
-        // and not needed to spool to disk
-        template.sendBody("direct:a", new MyInputStream(new ByteArrayInputStream("<hello/>".getBytes())));
-
-        spoolTask.setSpool(true);
-        template.sendBody("direct:a", new MyInputStream(new ByteArrayInputStream("<hallo/>".getBytes())));
-        template.sendBody("direct:a", new MyInputStream(new ByteArrayInputStream("<hellos/>".getBytes())));
-
-        assertMockEndpointsSatisfied();
-    }
-
-    private class MyInputStream extends FilterInputStream {
-
-        private MyInputStream(InputStream in) {
-            super(in);
-        }
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                context.getStreamCachingStrategy().setSpoolDirectory("target/cachedir");
-                context.getStreamCachingStrategy().addShouldSpoolTask(spoolTask);
-                context.getStreamCachingStrategy().setAnySpoolTasks(true);
-                context.setStreamCaching(true);
-
-                from("direct:a")
-                    .choice()
-                        .when(xpath("//hello")).to("mock:english")
-                        .when(xpath("//hallo")).to("mock:dutch", "mock:german")
-                        .otherwise().to("mock:french")
-                    .end()
-                    .process(new Processor() {
-                        @Override
-                        public void process(Exchange exchange) throws Exception {
-                            // check if spool file exists
-                            if (spoolTask.isSpool()) {
-                                String[] names = new File("target/cachedir").list();
-                                assertEquals("There should be a cached spool file", 1, names.length);
-                            }
-                        }
-                    });
-
-            }
-        };
-    }
-
-    private static final class MyCustomShouldSpoolTask implements StreamCachingStrategy.ShouldSpoolTask
{
-
-        private volatile boolean spool;
-
-        @Override
-        public boolean shouldSpoolCache(long length) {
-            return spool;
-        }
-
-        public boolean isSpool() {
-            return spool;
-        }
-
-        public void setSpool(boolean spool) {
-            this.spool = spool;
-        }
-
-        @Override
-        public String toString() {
-            return "MyCustomShouldSpoolTask";
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/camel/blob/5369bf1c/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
----------------------------------------------------------------------
diff --git a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
index 4b2de19..9fb8d9e 100644
--- a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
+++ b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
@@ -18,6 +18,7 @@ package org.apache.camel.core.xml;
 
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -389,6 +390,10 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex
         if (spoolThreshold != null) {
             getContext().getStreamCachingStrategy().setSpoolThreshold(spoolThreshold);
         }
+        Integer spoolUsedHeap = CamelContextHelper.parseInteger(getContext(), streamCaching.getSpoolUsedHeapMemoryThreshold());
+        if (spoolUsedHeap != null) {
+            getContext().getStreamCachingStrategy().setSpoolUsedHeapMemoryThreshold(spoolUsedHeap);
+        }
         String spoolChiper = CamelContextHelper.parseText(getContext(), streamCaching.getSpoolChiper());
         if (spoolChiper != null) {
             getContext().getStreamCachingStrategy().setSpoolChiper(spoolChiper);
@@ -401,6 +406,21 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex
         if (statisticsEnabled != null) {
             getContext().getStreamCachingStrategy().getStatistics().setStatisticsEnabled(statisticsEnabled);
         }
+        Boolean anySpoolRules = CamelContextHelper.parseBoolean(getContext(), streamCaching.getAnySpoolRules());
+        if (anySpoolRules != null) {
+            getContext().getStreamCachingStrategy().setAnySpoolRules(anySpoolRules);
+        }
+        String spoolRules = CamelContextHelper.parseText(getContext(), streamCaching.getAnySpoolRules());
+        if (spoolRules != null) {
+            Iterator it = ObjectHelper.createIterator(spoolRules);
+            while (it.hasNext()) {
+                String name = it.next().toString();
+                StreamCachingStrategy.SpoolRule rule = getContext().getRegistry().lookupByNameAndType(name,
StreamCachingStrategy.SpoolRule.class);
+                if (rule != null) {
+                    getContext().getStreamCachingStrategy().addSpoolRule(rule);
+                }
+            }
+        }
     }
 
     protected void initPropertyPlaceholder() throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/5369bf1c/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/CamelStreamCachingStrategyDefinition.java
----------------------------------------------------------------------
diff --git a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/CamelStreamCachingStrategyDefinition.java
b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/CamelStreamCachingStrategyDefinition.java
index 157ff41..9a3ce52 100644
--- a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/CamelStreamCachingStrategyDefinition.java
+++ b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/CamelStreamCachingStrategyDefinition.java
@@ -39,10 +39,16 @@ public class CamelStreamCachingStrategyDefinition extends IdentifiedType
{
     private String spoolDirectory;
 
     @XmlAttribute
+    private String spoolChiper;
+
+    @XmlAttribute
     private String spoolThreshold;
 
     @XmlAttribute
-    private String spoolChiper;
+    private String spoolUsedHeapMemoryThreshold;
+
+    @XmlAttribute
+    private String spoolRules;
 
     @XmlAttribute
     private String bufferSize;
@@ -53,6 +59,9 @@ public class CamelStreamCachingStrategyDefinition extends IdentifiedType
{
     @XmlAttribute
     private String statisticsEnabled;
 
+    @XmlAttribute
+    private String anySpoolRules;
+
     public String getEnabled() {
         return enabled;
     }
@@ -69,6 +78,14 @@ public class CamelStreamCachingStrategyDefinition extends IdentifiedType
{
         this.spoolDirectory = spoolDirectory;
     }
 
+    public String getSpoolChiper() {
+        return spoolChiper;
+    }
+
+    public void setSpoolChiper(String spoolChiper) {
+        this.spoolChiper = spoolChiper;
+    }
+
     public String getSpoolThreshold() {
         return spoolThreshold;
     }
@@ -77,12 +94,20 @@ public class CamelStreamCachingStrategyDefinition extends IdentifiedType
{
         this.spoolThreshold = spoolThreshold;
     }
 
-    public String getSpoolChiper() {
-        return spoolChiper;
+    public String getSpoolUsedHeapMemoryThreshold() {
+        return spoolUsedHeapMemoryThreshold;
     }
 
-    public void setSpoolChiper(String spoolChiper) {
-        this.spoolChiper = spoolChiper;
+    public void setSpoolUsedHeapMemoryThreshold(String spoolUsedHeapMemoryThreshold) {
+        this.spoolUsedHeapMemoryThreshold = spoolUsedHeapMemoryThreshold;
+    }
+
+    public String getSpoolRules() {
+        return spoolRules;
+    }
+
+    public void setSpoolRules(String spoolRules) {
+        this.spoolRules = spoolRules;
     }
 
     public String getBufferSize() {
@@ -108,4 +133,12 @@ public class CamelStreamCachingStrategyDefinition extends IdentifiedType
{
     public void setStatisticsEnabled(String statisticsEnabled) {
         this.statisticsEnabled = statisticsEnabled;
     }
+
+    public String getAnySpoolRules() {
+        return anySpoolRules;
+    }
+
+    public void setAnySpoolRules(String anySpoolRules) {
+        this.anySpoolRules = anySpoolRules;
+    }
 }
\ No newline at end of file


Mime
View raw message