Author: gtully
Date: Tue Nov 2 12:05:00 2010
New Revision: 1030013
URL: http://svn.apache.org/viewvc?rev=1030013&view=rev
Log:
resolve duplicate issue with concurrent durable subs and dlq, suppress the duplicates in the
default Dead letter strategy. With no duplicates, concurrentStoreAndDispatchQueues true is
fine. resolve https://issues.apache.org/activemq/browse/AMQ-2584
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1030013&r1=1030012&r2=1030013&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Tue Nov 2 12:05:00 2010
@@ -832,7 +832,8 @@ public class RegionBroker extends EmptyB
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Expired message with no DLQ strategy in place");
+ LOG.debug("Dead Letter message with no DLQ strategy in place, message id: "
+ + message.getMessageId() + ", destination: " + message.getDestination());
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java?rev=1030013&r1=1030012&r2=1030013&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
Tue Nov 2 12:05:00 2010
@@ -16,7 +16,10 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.command.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* A strategy for choosing which destination is used for dead letter queue
@@ -25,13 +28,21 @@ import org.apache.activemq.command.Messa
* @version $Revision: 426366 $
*/
public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
+ private static final Log LOG = LogFactory.getLog(AbstractDeadLetterStrategy.class);
private boolean processNonPersistent = false;
private boolean processExpired = true;
+ private ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
public boolean isSendToDeadLetterQueue(Message message) {
boolean result = false;
if (message != null) {
result = true;
+ if (audit.isDuplicate(message)) {
+ result = false;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not adding duplicate to DLQ: " + message.getMessageId() +
", dest: " + message.getDestination());
+ }
+ }
if (!message.isPersistent() && !processNonPersistent) {
result = false;
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java?rev=1030013&r1=1030012&r2=1030013&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
Tue Nov 2 12:05:00 2010
@@ -224,7 +224,12 @@ public class AMQ2584ConcurrentDlqTest ex
properties.put("maxFileLength", maxFileLengthVal);
properties.put("cleanupInterval", "2000");
properties.put("checkpointInterval", "2000");
- properties.put("concurrentStoreAndDispatchQueues", "false");
+ // there are problems with duplicate dispatch in the cursor, which maintain
+ // a map of messages. A dup dispatch can be dropped.
+ // see: org.apache.activemq.broker.region.cursors.OrderedPendingList
+ // Adding duplicate detection to the default DLQ strategy removes the problem
+ // which means we can leave the default for concurrent store and dispatch q
+ //properties.put("concurrentStoreAndDispatchQueues", "false");
IntrospectionSupport.setProperties(persistenceAdapter, properties);
}
|