camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/2] git commit: CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to configure and allow 3rd party to plugin custom strategies. Work in progress.
Date Mon, 22 Jul 2013 13:56:07 GMT
Updated Branches:
  refs/heads/master 3d7076cef -> 5369bf1ca


CAMEL-6476: Introducing StreamCachingStrategy SPI to make it easier to configure and allow
3rd party to plugin custom strategies. Work in progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b11f4e71
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b11f4e71
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b11f4e71

Branch: refs/heads/master
Commit: b11f4e7144fb9c9ce24cfb9d74640e920d1c786f
Parents: 3d7076c
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Mon Jul 22 15:11:22 2013 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Mon Jul 22 15:11:22 2013 +0200

----------------------------------------------------------------------
 .../ManagedStreamCachingStrategyMBean.java      |   6 +
 .../apache/camel/impl/DefaultCamelContext.java  |   4 +
 .../impl/DefaultStreamCachingStrategy.java      | 139 +++++++++++++++++--
 .../mbean/ManagedStreamCachingStrategy.java     |   8 ++
 .../apache/camel/spi/StreamCachingStrategy.java |  44 ++++++
 .../StreamCachingCustomShouldSpoolTaskTest.java | 118 ++++++++++++++++
 6 files changed, 305 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b11f4e71/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
index dd52a93..ea49ec5 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java
@@ -48,6 +48,12 @@ public interface ManagedStreamCachingStrategyMBean {
     @ManagedAttribute(description = "Whether to remove spool directory when stopping")
     boolean isRemoveSpoolDirectoryWhenStopping();
 
+    @ManagedAttribute(description = "Whether any or all should spool tasks determines should
spool")
+    void setAnySpoolTasks(boolean any);
+
+    @ManagedAttribute(description = "Whether any or all should spool tasks determines should
spool")
+    boolean isAnySpoolTasks();
+
     @ManagedAttribute(description = "Number of in-memory StreamCache created")
     long getCacheMemoryCounter();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/b11f4e71/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 1d92e4f..a75af0e 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -1675,6 +1675,10 @@ public class DefaultCamelContext extends ServiceSupport implements
ModelCamelCon
             // stream caching is in use so enable the strategy
             getStreamCachingStrategy().setEnabled(true);
             addService(getStreamCachingStrategy());
+        } else {
+            // log if stream caching is not in use as this can help people to enable it if
they use streams
+            log.info("StreamCaching is not in use. If using streams then its recommended
to enable stream caching."
+                    + " See more details at http://camel.apache.org/stream-caching.html");
         }
 
         // start routes

http://git-wip-us.apache.org/repos/asf/camel/blob/b11f4e71/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
index 15dd854..ddacc5d 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java
@@ -17,6 +17,10 @@
 package org.apache.camel.impl;
 
 import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.util.LinkedHashSet;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.camel.CamelContext;
@@ -35,7 +39,7 @@ import org.slf4j.LoggerFactory;
  */
 public class DefaultStreamCachingStrategy extends org.apache.camel.support.ServiceSupport
implements CamelContextAware, StreamCachingStrategy {
 
-    // TODO: add memory based watermarks for spool to disk
+    // TODO: add easy configuration in XML to add custom should spool tasks
 
     @Deprecated
     public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
@@ -53,10 +57,13 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
     private File spoolDirectory;
     private transient String spoolDirectoryName = "${java.io.tmpdir}camel-tmp-#uuid#";
     private long spoolThreshold = StreamCache.DEFAULT_SPOOL_THRESHOLD;
+    private int spoolHeapMemoryWatermarkThreshold;
     private String spoolChiper;
     private int bufferSize = IOHelper.DEFAULT_BUFFER_SIZE;
     private boolean removeSpoolDirectoryWhenStopping = true;
     private final UtilizationStatistics statistics = new UtilizationStatistics();
+    private final Set<ShouldSpoolTask> spoolTasks = new LinkedHashSet<ShouldSpoolTask>();
+    private boolean anySpoolTasks;
 
     public CamelContext getCamelContext() {
         return camelContext;
@@ -90,6 +97,14 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
         return spoolThreshold;
     }
 
+    public int getSpoolHeapMemoryWatermarkThreshold() {
+        return spoolHeapMemoryWatermarkThreshold;
+    }
+
+    public void setSpoolHeapMemoryWatermarkThreshold(int spoolHeapMemoryWatermarkThreshold)
{
+        this.spoolHeapMemoryWatermarkThreshold = spoolHeapMemoryWatermarkThreshold;
+    }
+
     public void setSpoolThreshold(long spoolThreshold) {
         this.spoolThreshold = spoolThreshold;
     }
@@ -118,15 +133,42 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
         this.removeSpoolDirectoryWhenStopping = removeSpoolDirectoryWhenStopping;
     }
 
+    public boolean isAnySpoolTasks() {
+        return anySpoolTasks;
+    }
+
+    public void setAnySpoolTasks(boolean anySpoolTasks) {
+        this.anySpoolTasks = anySpoolTasks;
+    }
+
     public Statistics getStatistics() {
         return statistics;
     }
 
     public boolean shouldSpoolCache(long length) {
-        if (spoolThreshold > 0 && length >= spoolThreshold) {
-            return true;
+        if (spoolTasks.isEmpty()) {
+            return false;
+        }
+
+        boolean all = true;
+        boolean any = false;
+        for (ShouldSpoolTask task : spoolTasks) {
+            boolean result = task.shouldSpoolCache(length);
+            if (!result) {
+                all = false;
+            } else {
+                any = true;
+                if (anySpoolTasks) {
+                    // no need to check anymore
+                    break;
+                }
+            }
         }
-        return false;
+        return anySpoolTasks ? any : all;
+    }
+
+    public void addShouldSpoolTask(ShouldSpoolTask task) {
+        spoolTasks.add(task);
     }
 
     public StreamCache cache(Exchange exchange) {
@@ -183,18 +225,24 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
             LOG.warn("Configuring of StreamCaching using CamelContext properties is deprecated
- use StreamCachingStrategy instead.");
         }
 
+        if (spoolHeapMemoryWatermarkThreshold < 0 || spoolHeapMemoryWatermarkThreshold
> 100) {
+            throw new IllegalArgumentException("SpoolHeapMemoryWatermarkThreshold must be
a value between 0 and 100, was: " + spoolHeapMemoryWatermarkThreshold);
+        }
+
         // if we can overflow to disk then make sure directory exists / is created
-        if (spoolThreshold > 0) {
+        if (spoolThreshold > 0 || spoolHeapMemoryWatermarkThreshold > 0) {
 
             if (spoolDirectory == null && spoolDirectoryName == null) {
                 throw new IllegalArgumentException("SpoolDirectory must be configured when
using SpoolThreshold > 0");
             }
 
-            if (spoolDirectory == null && spoolDirectoryName != null) {
+            if (spoolDirectory == null) {
                 String name = resolveSpoolDirectory(spoolDirectoryName);
                 if (name != null) {
                     spoolDirectory = new File(name);
                     spoolDirectoryName = null;
+                } else {
+                    throw new IllegalStateException("Cannot resolve spool directory from
pattern: " + spoolDirectoryName);
                 }
             }
 
@@ -211,15 +259,23 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
                 } else {
                     LOG.debug("Created spool directory: {}", spoolDirectory);
                 }
+
+            }
+
+            if (spoolThreshold > 0) {
+                spoolTasks.add(new FixedThresholdShouldSpoolTask(spoolThreshold));
+            }
+            if (spoolHeapMemoryWatermarkThreshold > 0) {
+                spoolTasks.add(new UsedHeapMemoryShouldSpoolTask(spoolHeapMemoryWatermarkThreshold));
             }
         }
 
         LOG.debug("StreamCaching configuration {}", this.toString());
 
-        if (spoolThreshold > 0) {
-            LOG.info("StreamCaching in use and overflow to disk enabled when > {} bytes
to directory: {}", spoolThreshold, spoolDirectory);
+        if (spoolDirectory != null) {
+            LOG.info("StreamCaching in use with spool directory: {} and thresholds: {}",
spoolDirectory.getPath(), spoolTasks.toString());
         } else {
-            LOG.info("StreamCaching in use with no overflow to disk (memory only)");
+            LOG.info("StreamCaching in use with thresholds: {}", spoolTasks.toString());
         }
     }
 
@@ -263,11 +319,66 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
             + "spoolDirectory=" + spoolDirectory
             + ", spoolThreshold=" + spoolThreshold
             + ", spoolChiper=" + spoolChiper
-            + ", bufferSize=" + bufferSize + "]";
+            + ", bufferSize=" + bufferSize
+            + ", anySpoolTasks=" + anySpoolTasks + "]";
+    }
+
+    private static final class FixedThresholdShouldSpoolTask implements ShouldSpoolTask {
+
+        private final long threshold;
+
+        private FixedThresholdShouldSpoolTask(long threshold) {
+            this.threshold = threshold;
+        }
+
+        public boolean shouldSpoolCache(long length) {
+            if (threshold > 0 && length > threshold) {
+                LOG.trace("Should spool cache {} > {} -> true", length, threshold);
+                return true;
+            }
+            return false;
+        }
+
+        public String toString() {
+            if (threshold < 1024) {
+                return "Spool > " + threshold + " bytes body size";
+            } else {
+                return "Spool > " + (threshold >> 10) + "K body size";
+            }
+        }
+    }
+
+    private final class UsedHeapMemoryShouldSpoolTask implements ShouldSpoolTask {
+
+        private final MemoryMXBean heapUsage;
+        private final int spoolPercentage;
+
+        private UsedHeapMemoryShouldSpoolTask(int spoolPercentage) {
+            this.spoolPercentage = spoolPercentage;
+            this.heapUsage = ManagementFactory.getMemoryMXBean();
+        }
+
+        public boolean shouldSpoolCache(long length) {
+            if (spoolPercentage > 0) {
+                long used = heapUsage.getHeapMemoryUsage().getUsed();
+                long committed = heapUsage.getHeapMemoryUsage().getCommitted();
+                long percentage = committed / used * 100;
+                LOG.trace("Heap memory: [used=%sK (%sK\\%), committed=%sK]", new Object[]{used
>> 10, percentage, committed >> 10});
+                if (percentage >= spoolPercentage) {
+                    LOG.trace("Should spool cache {} > {} -> true", percentage, spoolPercentage);
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        public String toString() {
+            return "Spool > " + spoolPercentage + "% used heap memory";
+        }
     }
 
     /**
-     * Represents utilization statistics
+     * Represents utilization statistics.
      */
     private final class UtilizationStatistics implements Statistics {
 
@@ -279,13 +390,13 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
         private volatile long spoolSize;
         private volatile long spoolAverageSize;
 
-        void updateMemory(long size) {
+        synchronized void updateMemory(long size) {
             memoryCounter++;
             memorySize += size;
             memoryAverageSize = memorySize / memoryCounter;
         }
 
-        void updateSpool(long size) {
+        synchronized void updateSpool(long size) {
             spoolCounter++;
             spoolSize += size;
             spoolAverageSize = spoolSize / spoolCounter;
@@ -315,7 +426,7 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi
             return spoolAverageSize;
         }
 
-        public void reset() {
+        public synchronized void reset() {
             memoryCounter = 0;
             memorySize = 0;
             memoryAverageSize = 0;

http://git-wip-us.apache.org/repos/asf/camel/blob/b11f4e71/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
index 7a6f81e..e831d1f 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java
@@ -82,6 +82,14 @@ public class ManagedStreamCachingStrategy extends ManagedService implements
Mana
         return streamCachingStrategy.isRemoveSpoolDirectoryWhenStopping();
     }
 
+    public void setAnySpoolTasks(boolean any) {
+        streamCachingStrategy.setAnySpoolTasks(any);
+    }
+
+    public boolean isAnySpoolTasks() {
+        return streamCachingStrategy.isAnySpoolTasks();
+    }
+
     public long getCacheMemoryCounter() {
         return streamCachingStrategy.getStatistics().getCacheMemoryCounter();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b11f4e71/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
index 52c6fad..c44eb4d 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java
@@ -81,6 +81,24 @@ public interface StreamCachingStrategy extends Service {
     }
 
     /**
+     * Task for determine if stream caching should be spooled to disk or kept in-memory.
+     */
+    interface ShouldSpoolTask {
+
+        /**
+         * Determines if the stream should be spooled or not. For example if the stream length
is
+         * over a threshold.
+         * <p/>
+         * This allows implementations to use custom strategies to determine if spooling
is needed or not.
+         *
+         * @param length the length of the stream
+         * @return <tt>true</tt> to spool the cache, or <tt>false</tt>
to keep the cache in-memory
+         */
+        boolean shouldSpoolCache(long length);
+
+    }
+
+    /**
      * Sets whether the stream caching is enabled.
      * <p/>
      * <b>Notice:</b> This cannot be changed at runtime.
@@ -112,6 +130,15 @@ public interface StreamCachingStrategy extends Service {
     long getSpoolThreshold();
 
     /**
+     * Sets a percentage (0-100) of used heap memory threshold to activate spooling to disk.
+     *
+     * @param percentage percentage of used heap memory.
+     */
+    void setSpoolHeapMemoryWatermarkThreshold(int percentage);
+
+    int getSpoolHeapMemoryWatermarkThreshold();
+
+    /**
      * Sets the buffer size to use when allocating in-memory buffers used for in-memory stream
caches.
      * <p/>
      * The default size is {@link org.apache.camel.util.IOHelper#DEFAULT_BUFFER_SIZE}
@@ -139,11 +166,28 @@ public interface StreamCachingStrategy extends Service {
     boolean isRemoveSpoolDirectoryWhenStopping();
 
     /**
+     * Sets whether if just any of the {@link ShouldSpoolTask}
+     * returns <tt>true</tt> then {@link #shouldSpoolCache(long)} returns <tt>true</tt>.
+     * If this option is <tt>false</tt>, then <b>all</b> the {@link
ShouldSpoolTask} must
+     * return <tt>true</tt>.
+     * <p/>
+     * The default value is <tt>false</tt>
+     */
+    void setAnySpoolTasks(boolean any);
+
+    boolean isAnySpoolTasks();
+
+    /**
      * Gets the utilization statistics.
      */
     Statistics getStatistics();
 
     /**
+     * Adds the {@link ShouldSpoolTask} to be used.
+     */
+    void addShouldSpoolTask(ShouldSpoolTask task);
+
+    /**
      * Determines if the stream should be spooled or not. For example if the stream length
is
      * over a threshold.
      * <p/>

http://git-wip-us.apache.org/repos/asf/camel/blob/b11f4e71/camel-core/src/test/java/org/apache/camel/processor/StreamCachingCustomShouldSpoolTaskTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/StreamCachingCustomShouldSpoolTaskTest.java
b/camel-core/src/test/java/org/apache/camel/processor/StreamCachingCustomShouldSpoolTaskTest.java
new file mode 100644
index 0000000..3a96ea7
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/StreamCachingCustomShouldSpoolTaskTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FilterInputStream;
+import java.io.InputStream;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.StreamCachingStrategy;
+
+public class StreamCachingCustomShouldSpoolTaskTest extends ContextTestSupport {
+
+    private MyCustomShouldSpoolTask spoolTask = new MyCustomShouldSpoolTask();
+
+    @Override
+    protected void setUp() throws Exception {
+        deleteDirectory("target/cachedir");
+        super.setUp();
+    }
+
+    public void testByteArrayInputStream() throws Exception {
+        getMockEndpoint("mock:english").expectedBodiesReceived("<hello/>");
+        getMockEndpoint("mock:dutch").expectedBodiesReceived("<hallo/>");
+        getMockEndpoint("mock:german").expectedBodiesReceived("<hallo/>");
+        getMockEndpoint("mock:french").expectedBodiesReceived("<hellos/>");
+
+        // need to wrap in MyInputStream as ByteArrayInputStream is optimized to just reuse
in memory buffer
+        // and not needed to spool to disk
+        template.sendBody("direct:a", new MyInputStream(new ByteArrayInputStream("<hello/>".getBytes())));
+
+        spoolTask.setSpool(true);
+        template.sendBody("direct:a", new MyInputStream(new ByteArrayInputStream("<hallo/>".getBytes())));
+        template.sendBody("direct:a", new MyInputStream(new ByteArrayInputStream("<hellos/>".getBytes())));
+
+        assertMockEndpointsSatisfied();
+    }
+
+    private class MyInputStream extends FilterInputStream {
+
+        private MyInputStream(InputStream in) {
+            super(in);
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.getStreamCachingStrategy().setSpoolDirectory("target/cachedir");
+                context.getStreamCachingStrategy().addShouldSpoolTask(spoolTask);
+                context.getStreamCachingStrategy().setAnySpoolTasks(true);
+                context.setStreamCaching(true);
+
+                from("direct:a")
+                    .choice()
+                        .when(xpath("//hello")).to("mock:english")
+                        .when(xpath("//hallo")).to("mock:dutch", "mock:german")
+                        .otherwise().to("mock:french")
+                    .end()
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // check if spool file exists
+                            if (spoolTask.isSpool()) {
+                                String[] names = new File("target/cachedir").list();
+                                assertEquals("There should be a cached spool file", 1, names.length);
+                            }
+                        }
+                    });
+
+            }
+        };
+    }
+
+    private static final class MyCustomShouldSpoolTask implements StreamCachingStrategy.ShouldSpoolTask
{
+
+        private volatile boolean spool;
+
+        @Override
+        public boolean shouldSpoolCache(long length) {
+            return spool;
+        }
+
+        public boolean isSpool() {
+            return spool;
+        }
+
+        public void setSpool(boolean spool) {
+            this.spool = spool;
+        }
+
+        @Override
+        public String toString() {
+            return "MyCustomShouldSpoolTask";
+        }
+    }
+}
+


Mime
View raw message