activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r936390 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/broker/region/policy/ main/java/org/apache/activemq/network/ tes...
Date Wed, 21 Apr 2010 16:45:30 GMT
Author: gtully
Date: Wed Apr 21 16:45:30 2010
New Revision: 936390

URL: http://svn.apache.org/viewvc?rev=936390&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2704 - add message audit to topic sub
so that a regular sub can behave like store backed subscriptions which already suppress duplicates.
Dup ocurrs from ring network topology where there are two equal and valid routes for a message,
see test case

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=936390&r1=936389&r2=936390&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Wed Apr 21 16:45:30 2010
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.Atomi
 
 import javax.jms.JMSException;
 
+import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
@@ -62,6 +63,12 @@ public class TopicSubscription extends A
     private final AtomicLong dequeueCounter = new AtomicLong(0);
     private int memoryUsageHighWaterMark = 95;
     private boolean slowConsumer;
+    
+    // allow duplicate suppression in a ring network of brokers
+    protected int maxProducersToAudit = 1024;
+    protected int maxAuditDepth = 1000;
+    protected boolean enableAudit = false;
+    protected ActiveMQMessageAudit audit;
 
     public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info,
SystemUsage usageManager) throws Exception {
         super(broker, context, info);
@@ -78,9 +85,15 @@ public class TopicSubscription extends A
         this.matched.setSystemUsage(usageManager);
         this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
         this.matched.start();
+        if (enableAudit) {
+            audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
+        }
     }
 
     public void add(MessageReference node) throws Exception {
+        if (isDuplicate(node)) {
+            return;
+        }
         enqueueCounter.incrementAndGet();
         if (!isFull() && matched.isEmpty()  && !isSlave()) {
             // if maximumPendingMessages is set we will only discard messages which
@@ -158,6 +171,19 @@ public class TopicSubscription extends A
         }
     }
 
+    private boolean isDuplicate(MessageReference node) {
+        boolean duplicate = false;
+        if (enableAudit && audit != null) {
+            duplicate = audit.isDuplicate(node);
+            if (LOG.isDebugEnabled()) {
+                if (duplicate) {
+                    LOG.debug("ignoring duplicate add: " + node.getMessageId());
+                }
+            }
+        }
+        return duplicate;
+    }
+
     /**
      * Discard any expired messages from the matched list. Called from a
      * synchronized block.
@@ -313,6 +339,39 @@ public class TopicSubscription extends A
         this.messageEvictionStrategy = messageEvictionStrategy;
     }
 
+    public int getMaxProducersToAudit() {
+        return maxProducersToAudit;
+    }
+
+    public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
+        this.maxProducersToAudit = maxProducersToAudit;
+        if (audit != null) {
+            audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
+        }
+    }
+
+    public int getMaxAuditDepth() {
+        return maxAuditDepth;
+    }
+    
+    public synchronized void setMaxAuditDepth(int maxAuditDepth) {
+        this.maxAuditDepth = maxAuditDepth;
+        if (audit != null) {
+            audit.setAuditDepth(maxAuditDepth);
+        }
+    }
+    
+    public boolean isEnableAudit() {
+        return enableAudit;
+    }
+
+    public synchronized void setEnableAudit(boolean enableAudit) {
+        this.enableAudit = enableAudit;
+        if (enableAudit && audit==null) {
+            audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
+        }
+    }
+    
     // Implementation methods
     // -------------------------------------------------------------------------
     public boolean isFull() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=936390&r1=936389&r2=936390&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Wed Apr 21 16:45:30 2010
@@ -91,7 +91,9 @@ public abstract class AbstractStoreCurso
              * the cache. If subsequently, we pull out that message from the store (before
its deleted)
              * it will be a duplicate - but should be ignored
              */
-            //LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + "
cursor got duplicate: " + message);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() +
" cursor got duplicate: " + message);
+            }
             storeHasMessages = true;
         }
         return recovered;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=936390&r1=936389&r2=936390&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Wed Apr 21 16:45:30 2010
@@ -178,6 +178,11 @@ public class PolicyEntry extends Destina
             int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
             subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name,
maxBatchSize));
         }
+        if (enableAudit) {
+            subscription.setEnableAudit(enableAudit);
+            subscription.setMaxProducersToAudit(maxProducersToAudit);
+            subscription.setMaxAuditDepth(maxAuditDepth);
+        }
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, DurableTopicSubscription
sub) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=936390&r1=936389&r2=936390&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Wed Apr 21 16:45:30 2010
@@ -19,6 +19,7 @@ package org.apache.activemq.network;
 import java.io.IOException;
 import java.security.GeneralSecurityException;
 import java.security.cert.X509Certificate;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
@@ -680,9 +681,9 @@ public abstract class DemandForwardingBr
                     final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
                     if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses())
{
                         
-                        if (originallyCameFromRemote(md, sub)) {
+                        if (suppressMessageDispatch(md, sub)) {
                             if (LOG.isDebugEnabled()) {
-                                LOG.debug(configuration.getBrokerName() + " message not forwarded
to " + remoteBrokerName + " because message came from there or fails networkTTL: " + md.getMessage());
+                                LOG.debug(configuration.getBrokerName() + " message not forwarded
to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath:
" + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage());
                             }
                             // still ack as it may be durable
                             try {
@@ -695,7 +696,7 @@ public abstract class DemandForwardingBr
                         
                         Message message = configureMessage(md);
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug("bridging " + configuration.getBrokerName() + " ->
" + remoteBrokerName + ": " + message);
+                            LOG.debug("bridging " + configuration.getBrokerName() + " ->
" + remoteBrokerName + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message:
" + message);
                         }
                         
                         if (!message.isResponseRequired()) {
@@ -776,25 +777,25 @@ public abstract class DemandForwardingBr
         }
     }
 
-    private boolean originallyCameFromRemote(MessageDispatch md, DemandSubscription sub)
throws Exception {
+    private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws
Exception {
         // See if this consumer's brokerPath tells us it came from the broker at the other
end
         // of the bridge. I think we should be making this decision based on the message's
         // broker bread crumbs and not the consumer's? However, the message's broker bread
         // crumbs are null, which is another matter.   
-        boolean cameFromRemote = false;
+        boolean suppress = false;
         Object consumerInfo = md.getMessage().getDataStructure();
         if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo)) {
-            cameFromRemote = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId());
+            suppress = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId());
         }
         
         // for durable subs, suppression via filter leaves dangling acks so we need to 
         // check here and allow the ack irrespective
-        if (!cameFromRemote && sub.getLocalInfo().isDurable()) {
+        if (!suppress && sub.getLocalInfo().isDurable()) {
             MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
             messageEvalContext.setMessageReference(md.getMessage());
-            cameFromRemote = !createNetworkBridgeFilter(null).matches(messageEvalContext);
+            suppress = !createNetworkBridgeFilter(null).matches(messageEvalContext);
         }  
-        return cameFromRemote;
+        return suppress;
     }
 
     /**
@@ -1154,7 +1155,7 @@ public abstract class DemandForwardingBr
             sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
         } else  {
             // need to ack this message if it is ignored as it is durable so
-            // we check before we send. see: originallyCameFromRemote()
+            // we check before we send. see: suppressMessageDispatch()
         }
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java?rev=936390&r1=936389&r2=936390&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
Wed Apr 21 16:45:30 2010
@@ -17,15 +17,21 @@
 package org.apache.activemq.usecases;
 
 import java.net.URI;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 
 import javax.jms.Destination;
 import javax.jms.MessageConsumer;
+import javax.jms.Topic;
 
 import junit.framework.Test;
 
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
-import org.apache.activemq.transport.failover.FailoverUriTest;
+import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.util.MessageIdList;
 
 /**
@@ -242,6 +248,106 @@ public class ThreeBrokerTopicNetworkTest
         assertEquals(MESSAGE_COUNT * 3, msgsC.getMessageCount());
     }
 
+    public void testAllConnectedBrokerNetworkSingleProducerTTL() throws Exception {
+        
+        // duplicates are expected with ttl of 2 as each broker is connected to the next
+        // but the dups are suppressed by the store and now also by the topic sub when enableAudit
+        // default (true) is present in a matching destination policy entry
+        int networkTTL = 2;
+        boolean conduitSubs = true;
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL, conduitSubs);
+        bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL, conduitSubs);
+        bridgeBrokers("BrokerB", "BrokerC", dynamicOnly, networkTTL, conduitSubs);
+        bridgeBrokers("BrokerC", "BrokerB", dynamicOnly, networkTTL, conduitSubs);
+        bridgeBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL, conduitSubs);
+        bridgeBrokers("BrokerC", "BrokerA", dynamicOnly, networkTTL, conduitSubs);
+
+        PolicyMap policyMap = new PolicyMap();
+        // enable audit is on by default just need to give it matching policy entry
+        // so it will be applied to the topic subscription
+        policyMap.setDefaultEntry(new PolicyEntry());
+        Collection<BrokerItem> brokerList = brokers.values();
+        for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
+            BrokerService broker = i.next().broker;
+            broker.setDestinationPolicy(policyMap);
+            broker.setDeleteAllMessagesOnStartup(true);
+        }
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        MessageConsumer clientA = createConsumer("BrokerA", dest);
+        MessageConsumer clientB = createConsumer("BrokerB", dest);
+        MessageConsumer clientC = createConsumer("BrokerC", dest);
+        //let consumers propogate around the network
+        Thread.sleep(2000);
+
+        // Send messages
+        sendMessages("BrokerA", dest, 1);
+        
+        // Get message count
+        MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+        MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+        MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+        msgsA.waitForMessagesToArrive(1);
+        msgsB.waitForMessagesToArrive(1);
+        msgsC.waitForMessagesToArrive(1);
+
+        // ensure we don't get any more messages
+        Thread.sleep(2000);
+        
+        assertEquals(1, msgsA.getMessageCount());
+        assertEquals(1, msgsB.getMessageCount());
+        assertEquals(1, msgsC.getMessageCount());
+    }
+
+    public void testAllConnectedBrokerNetworkDurableSubTTL() throws Exception {
+        int networkTTL = 2;
+        boolean conduitSubs = true;
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL, conduitSubs);
+        bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL, conduitSubs);
+        bridgeBrokers("BrokerB", "BrokerC", dynamicOnly, networkTTL, conduitSubs);
+        bridgeBrokers("BrokerC", "BrokerB", dynamicOnly, networkTTL, conduitSubs);
+        bridgeBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL, conduitSubs);
+        bridgeBrokers("BrokerC", "BrokerA", dynamicOnly, networkTTL, conduitSubs);
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        MessageConsumer clientA = createDurableSubscriber("BrokerA", (Topic)dest, "clientA");
+        MessageConsumer clientB = createDurableSubscriber("BrokerB", (Topic)dest, "clientB");
+        MessageConsumer clientC = createDurableSubscriber("BrokerC", (Topic)dest, "clientC");
+        //let consumers propogate around the network
+        Thread.sleep(2000);
+
+        // Send messages
+        sendMessages("BrokerA", dest, 1);
+
+        // Get message count
+        MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+        MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+        MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+        msgsA.waitForMessagesToArrive(1);
+        msgsB.waitForMessagesToArrive(1);
+        msgsC.waitForMessagesToArrive(1);
+
+        // ensure we don't get any more messages
+        Thread.sleep(2000);
+        
+        assertEquals(1, msgsA.getMessageCount());
+        assertEquals(1, msgsB.getMessageCount());
+        assertEquals(1, msgsC.getMessageCount());
+    }
+    
     /**
      * BrokerA <-> BrokerB <-> BrokerC
      */
@@ -284,9 +390,10 @@ public class ThreeBrokerTopicNetworkTest
     public void setUp() throws Exception {
         super.setAutoFail(true);
         super.setUp();
-        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
-        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
-        createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false"));
+        String options = new String("?persistent=false&useJmx=false"); 
+        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
+        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
+        createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options));
     }
     
     public static Test suite() {



Mime
View raw message