activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r520823 - in /activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor: DeadLetterChannel.java Logger.java RedeliveryPolicy.java
Date Wed, 21 Mar 2007 09:56:17 GMT
Author: jstrachan
Date: Wed Mar 21 02:56:11 2007
New Revision: 520823

URL: http://svn.apache.org/viewvc?view=rev&rev=520823
Log:
added an initial implementation of Dead Letter Channel

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Logger.java 
 (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
  (with props)

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?view=auto&rev=520823
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
Wed Mar 21 02:56:11 2007
@@ -0,0 +1,129 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Implements a
+ * <a href="http://activemq.apache.org/camel/dead-letter-channel.html">Dead Letter
Channel</a>
+ * after attempting to redeliver the message using the {@link RedeliveryPolicy}
+ *
+ * @version $Revision$
+ */
+public class DeadLetterChannel<E extends Exchange> implements Processor<E> {
+    public static final String REDELIVERY_COUNT_HEADER = "org.apache.camel.redeliveryCount";
+
+    private static final transient Log log = LogFactory.getLog(DeadLetterChannel.class);
+    private Processor<E> output;
+    private Processor<E> deadLetter;
+    private RedeliveryPolicy redeliveryPolicy;
+    private String redeliveryCountHeader = REDELIVERY_COUNT_HEADER;
+
+    public DeadLetterChannel(Processor<E> output, Processor<E> deadLetter) {
+        this(output, deadLetter, new RedeliveryPolicy());
+    }
+
+    public DeadLetterChannel(Processor<E> output, Processor<E> deadLetter, RedeliveryPolicy
redeliveryPolicy) {
+        this.deadLetter = deadLetter;
+        this.output = output;
+        this.redeliveryPolicy = redeliveryPolicy;
+    }
+
+    public void onExchange(E exchange) {
+        int redeliveryCounter = 0;
+        long redeliveryDelay = 0;
+
+        do {
+            if (redeliveryCounter++ > 0) {
+                // Figure out how long we should wait to resend this message.
+                redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
+                sleep(redeliveryDelay);
+                appendRedeliveryHeaders(exchange, redeliveryCounter);
+            }
+
+            try {
+                output.onExchange(exchange);
+                return;
+            }
+            catch (RuntimeException e) {
+                log.error("On delivery attempt: " + redeliveryCounter + " caught: " + e,
e);
+            }
+        }
+        while (redeliveryPolicy.shouldRedeliver(redeliveryCounter));
+
+        // now lets send to the dead letter queue
+        deadLetter.onExchange(exchange);
+    }
+
+
+    // Properties
+    //-------------------------------------------------------------------------
+    public RedeliveryPolicy getRedeliveryPolicy() {
+        return redeliveryPolicy;
+    }
+
+    /**
+     * Sets the redelivery policy
+     */
+    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
+        this.redeliveryPolicy = redeliveryPolicy;
+    }
+
+    public String getRedeliveryCountHeader() {
+        return redeliveryCountHeader;
+    }
+
+    /**
+     * Sets the message header name to be used to append the redelivery count value when
a message has been redelivered
+     *
+     * @param redeliveryCountHeader the header name to use to append the redelivery count
or null if you wish to disable
+     * this feature
+     */
+    public void setRedeliveryCountHeader(String redeliveryCountHeader) {
+        this.redeliveryCountHeader = redeliveryCountHeader;
+    }
+
+    // Implementation methods
+    //-------------------------------------------------------------------------
+    protected void appendRedeliveryHeaders(E exchange, int redeliveryCounter) {
+        String header = getRedeliveryCountHeader();
+        if (header != null) {
+            exchange.getIn().getHeaders().setHeader(header, redeliveryCounter);
+        }
+    }
+
+    protected void sleep(long redeliveryDelay) {
+        if (redeliveryDelay > 0) {
+        if (log.isDebugEnabled()) {
+            log.debug("Sleeping for: " + redeliveryDelay + " until attempting redelivery");
+        }
+            try {
+                Thread.sleep(redeliveryDelay);
+            }
+            catch (InterruptedException e) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Thread interupted: " + e, e);
+                }
+            }
+        }
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Logger.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Logger.java?view=auto&rev=520823
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Logger.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Logger.java Wed
Mar 21 02:56:11 2007
@@ -0,0 +1,95 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A {@link Processor} which just logs to a {@link Log} object which can be used as an exception
+ * handler instead of using a dead letter queue.
+ *
+ * @version $Revision$
+ */
+public class Logger<E extends Exchange> implements Processor<E> {
+    private Log log;
+    private Level level = Level.INFO;
+
+    public static enum Level {
+        DEBUG, ERROR, FATAL, INFO, TRACE, WARN;
+    };
+
+    public Logger() {
+        this(LogFactory.getLog(Logger.class));
+    }
+
+    public Logger(Log log) {
+        this.log = log;
+    }
+
+    public void onExchange(E exchange) {
+        switch (level) {
+            case DEBUG:
+                if (log.isDebugEnabled()) {
+                    log.debug(logMessage(exchange));
+                }
+                break;
+            case ERROR:
+                if (log.isErrorEnabled()) {
+                    log.error(logMessage(exchange));
+                }
+                break;
+            case FATAL:
+                if (log.isFatalEnabled()) {
+                    log.fatal(logMessage(exchange));
+                }
+                break;
+            case INFO:
+                if (log.isInfoEnabled()) {
+                    log.debug(logMessage(exchange));
+                }
+                break;
+            case TRACE:
+                if (log.isTraceEnabled()) {
+                    log.trace(logMessage(exchange));
+                }
+                break;
+            case WARN:
+                if (log.isWarnEnabled()) {
+                    log.warn(logMessage(exchange));
+                }
+                break;
+            default:
+                log.error("Unknown level: " + level + " when trying to log exchange: " +
logMessage(exchange));
+        }
+    }
+
+    protected Object logMessage(E exchange) {
+        return exchange;
+    }
+
+    public Log getLog() {
+        return log;
+    }
+
+    public void setLog(Log log) {
+        this.log = log;
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Logger.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Logger.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Logger.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java?view=auto&rev=520823
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
Wed Mar 21 02:56:11 2007
@@ -0,0 +1,226 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Random;
+
+// Code taken from the ActiveMQ codebase
+
+/**
+ * The policy used to decide how many times to redeliver and the time between the redeliveries
before being sent to a
+ * <a href="http://activemq.apache.org/camel/dead-letter-channel.html">Dead Letter
Channel</a>
+ *
+ * @version $Revision$
+ */
+public class RedeliveryPolicy {
+    protected int maximumRedeliveries = 6;
+    protected long initialRedeliveryDelay = 1000L;
+    protected double backOffMultiplier = 2;
+    protected boolean useExponentialBackOff = false;
+    // +/-15% for a 30% spread -cgs
+    protected double collisionAvoidanceFactor = 0.15d;
+    protected boolean useCollisionAvoidance = false;
+    protected static Random randomNumberGenerator;
+
+    public RedeliveryPolicy() {
+    }
+
+    public RedeliveryPolicy copy() {
+        try {
+            return (RedeliveryPolicy) clone();
+        }
+        catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Could not clone: " + e, e);
+        }
+    }
+
+    /**
+     * Returns true if the policy decides that the message exchange should be redelivered
+     */
+    public boolean shouldRedeliver(int redeliveryCounter) {
+        return redeliveryCounter <= getMaximumRedeliveries();
+    }
+
+    // Builder methods
+    //-------------------------------------------------------------------------
+
+    /**
+     * Sets the maximum number of times a message exchange will be redelivered
+     */
+    public RedeliveryPolicy maximumRedeliveries(int maximumRedeliveries) {
+        setMaximumRedeliveries(maximumRedeliveries);
+        return this;
+    }
+
+    /**
+     * Sets the initial redelivery delay in milliseconds on the first redelivery
+     */
+    public RedeliveryPolicy initialRedeliveryDelay(long initialRedeliveryDelay) {
+        setInitialRedeliveryDelay(initialRedeliveryDelay);
+        return this;
+    }
+
+    /**
+     * Enables collision avoidence which adds some randomization to the backoff timings to
reduce contention probability
+     */
+    public RedeliveryPolicy useCollisionAvoidance() {
+        setUseCollisionAvoidance(true);
+        return this;
+    }
+
+    /**
+     * Enables exponential backof using the {@link #getBackOffMultiplier()} to increase the
time between retries
+     */
+    public RedeliveryPolicy useExponentialBackOff() {
+        setUseExponentialBackOff(true);
+        return this;
+    }
+
+    /**
+     * Enables exponential backoff and sets the multiplier used to increase the delay between
redeliveries
+     */
+    public RedeliveryPolicy backOffMultiplier(double backOffMultiplier) {
+        useExponentialBackOff();
+        setBackOffMultiplier(backOffMultiplier);
+        return this;
+    }
+
+    /**
+     * Enables collision avoidence and sets the percentage used
+     */
+    public RedeliveryPolicy collisionAvoidancePercent(short collisionAvoidancePercent) {
+        useCollisionAvoidance();
+        setCollisionAvoidancePercent(collisionAvoidancePercent);
+        return this;
+    }
+       
+
+    // Properties
+    //-------------------------------------------------------------------------
+    public double getBackOffMultiplier() {
+        return backOffMultiplier;
+    }
+
+    /**
+     * Sets the multiplier used to increase the delay between redeliveries if {@link #setUseExponentialBackOff(boolean)}
is enabled
+     */
+    public void setBackOffMultiplier(double backOffMultiplier) {
+        this.backOffMultiplier = backOffMultiplier;
+    }
+
+    public short getCollisionAvoidancePercent() {
+        return (short) Math.round(collisionAvoidanceFactor * 100);
+    }
+
+    /**
+     * Sets the percentage used for collision avoidence if enabled via {@link #setUseCollisionAvoidance(boolean)}
+     */
+    public void setCollisionAvoidancePercent(short collisionAvoidancePercent) {
+        this.collisionAvoidanceFactor = collisionAvoidancePercent * 0.01d;
+    }
+
+    public double getCollisionAvoidanceFactor() {
+        return collisionAvoidanceFactor;
+    }
+
+    /**
+     * Sets the factor used for collision avoidence if enabled via {@link #setUseCollisionAvoidance(boolean)}
+     */
+    public void setCollisionAvoidanceFactor(double collisionAvoidanceFactor) {
+        this.collisionAvoidanceFactor = collisionAvoidanceFactor;
+    }
+
+    public long getInitialRedeliveryDelay() {
+        return initialRedeliveryDelay;
+    }
+
+    /**
+     * Sets the initial redelivery delay in milliseconds on the first redelivery
+     */
+    public void setInitialRedeliveryDelay(long initialRedeliveryDelay) {
+        this.initialRedeliveryDelay = initialRedeliveryDelay;
+    }
+
+    public int getMaximumRedeliveries() {
+        return maximumRedeliveries;
+    }
+
+    /**
+     * Sets the maximum number of times a message exchange will be redelivered
+     */
+    public void setMaximumRedeliveries(int maximumRedeliveries) {
+        this.maximumRedeliveries = maximumRedeliveries;
+    }
+
+    public long getRedeliveryDelay(long previousDelay) {
+        long redeliveryDelay;
+
+        if (previousDelay == 0) {
+            redeliveryDelay = initialRedeliveryDelay;
+        }
+        else if (useExponentialBackOff && backOffMultiplier > 1) {
+            redeliveryDelay = Math.round(backOffMultiplier * previousDelay);
+        }
+        else {
+            redeliveryDelay = previousDelay;
+        }
+
+        if (useCollisionAvoidance) {
+            if (randomNumberGenerator == null) {
+                initRandomNumberGenerator();
+            }
+
+            /*
+             * First random determines +/-, second random determines how far to
+             * go in that direction. -cgs
+             */
+            double variance = (randomNumberGenerator.nextBoolean() ? collisionAvoidanceFactor
: -collisionAvoidanceFactor) * randomNumberGenerator.nextDouble();
+            redeliveryDelay += redeliveryDelay * variance;
+        }
+
+        return redeliveryDelay;
+    }
+
+    public boolean isUseCollisionAvoidance() {
+        return useCollisionAvoidance;
+    }
+
+    /**
+     * Enables/disables collision avoidence which adds some randomization to the backoff
timings to reduce contention probability
+     */
+    public void setUseCollisionAvoidance(boolean useCollisionAvoidance) {
+        this.useCollisionAvoidance = useCollisionAvoidance;
+    }
+
+    public boolean isUseExponentialBackOff() {
+        return useExponentialBackOff;
+    }
+
+    /**
+     * Enables/disables exponential backof using the {@link #getBackOffMultiplier()} to increase
the time between retries
+     */
+    public void setUseExponentialBackOff(boolean useExponentialBackOff) {
+        this.useExponentialBackOff = useExponentialBackOff;
+    }
+
+    protected static synchronized void initRandomNumberGenerator() {
+        if (randomNumberGenerator == null) {
+            randomNumberGenerator = new Random();
+        }
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message