activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r760721 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadaptor/ main/java/org/apache/activemq/usage/ test/java/org/apache/activemq/bugs/
Date Tue, 31 Mar 2009 23:10:04 GMT
Author: gtully
Date: Tue Mar 31 23:10:04 2009
New Revision: 760721

URL: http://svn.apache.org/viewvc?rev=760721&view=rev
Log:
topic version of test case for AMQ-2149|https://issues.apache.org/activemq/browse/AMQ-2149
with associated fixes to suppress duplicates and out of order messages

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=760721&r1=760720&r2=760721&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
Tue Mar 31 23:10:04 2009
@@ -22,6 +22,7 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -44,6 +45,9 @@
     private static final Log LOG = LogFactory.getLog(KahaReferenceStore.class);
     protected final MapContainer<MessageId, ReferenceRecord> messageContainer;
     protected KahaReferenceStoreAdapter adapter;
+    // keep track of dispatched messages so that duplicate sends that follow a successful
+    // dispatch can be suppressed.
+    protected ActiveMQMessageAudit dispatchAudit = new ActiveMQMessageAudit();
     private StoreEntry batchEntry;
     private String lastBatchId;
     protected final Lock lock = new ReentrantLock();
@@ -152,14 +156,13 @@
         boolean uniqueueReferenceAdded = false;
         lock.lock();
         try {
-            if (!messageContainer.containsKey(messageId)) {
+            if (!isDuplicate(messageId)) {
                 ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
                 messageContainer.put(messageId, record);
                 uniqueueReferenceAdded = true;
                 addInterest(record);
-            } else {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(destination.getPhysicalName() + " ignoring duplicated (add)
message reference:"  + messageId);
+                    LOG.debug(destination.getPhysicalName() + " add: " + messageId);
                 }
             }
         } finally {
@@ -168,6 +171,24 @@
         return uniqueueReferenceAdded;
     }
 
+    protected boolean isDuplicate(final MessageId messageId) {
+        boolean duplicate = messageContainer.containsKey(messageId);
+        if (!duplicate) {
+            duplicate = dispatchAudit.isDuplicate(messageId);
+            if (duplicate) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(destination.getPhysicalName()
+                        + " ignoring duplicated (add) message reference, already dispatched:
"
+                        + messageId);
+                }
+            }
+        } else if (LOG.isDebugEnabled()) {
+            LOG.debug(destination.getPhysicalName()
+                    + " ignoring duplicated (add) message reference, already in store: "
+ messageId);
+        }
+        return duplicate;
+    }
+    
     public ReferenceData getMessageReference(MessageId identity) throws IOException {
         lock.lock();
         try {
@@ -193,6 +214,10 @@
                 ReferenceRecord rr = messageContainer.remove(msgId);
                 if (rr != null) {
                     removeInterest(rr);
+                    dispatchAudit.isDuplicate(msgId);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(destination.getPhysicalName() + " remove reference: " +
msgId);
+                    }
                     if (messageContainer.isEmpty()
                         || (lastBatchId != null && lastBatchId.equals(msgId.toString()))
                         || (batchEntry != null && batchEntry.equals(entry))) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=760721&r1=760720&r2=760721&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Tue Mar 31 23:10:04 2009
@@ -22,6 +22,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -34,9 +35,11 @@
 import org.apache.activemq.kaha.StoreEntry;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore
{
-
+    private static final Log LOG = LogFactory.getLog(KahaTopicReferenceStore.class);
     protected ListContainer<TopicSubAck> ackContainer;
     protected Map<String, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<String,
TopicSubContainer>();
     private MapContainer<String, SubscriptionInfo> subscriberContainer;
@@ -75,15 +78,17 @@
         throw new RuntimeException("Use addMessageReference instead");
     }
 
-    public  boolean addMessageReference(final ConnectionContext context, final MessageId
messageId,
+    public boolean addMessageReference(final ConnectionContext context, final MessageId messageId,
                                     final ReferenceData data) {
+        boolean uniqueReferenceAdded = false;
         lock.lock();
         try {
             final ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
             final int subscriberCount = subscriberMessages.size();
-            if (subscriberCount > 0) {
+            if (subscriberCount > 0 && !isDuplicate(messageId)) {
                 final StoreEntry messageEntry = messageContainer.place(messageId, record);
                 addInterest(record);
+                uniqueReferenceAdded = true;
                 final TopicSubAck tsa = new TopicSubAck();
                 tsa.setCount(subscriberCount);
                 tsa.setMessageEntry(messageEntry);
@@ -96,11 +101,14 @@
                     ref.setMessageId(messageId);
                     container.add(ref);
                 }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(destination.getPhysicalName() + " add reference: " + messageId);
+                }
             }
-        }finally {
+        } finally {
             lock.unlock();
         }
-        return true;
+        return uniqueReferenceAdded;
     }
 
     public ReferenceData getMessageReference(final MessageId identity) throws IOException
{
@@ -159,16 +167,22 @@
                                 messageContainer.remove(entry);
                                 removeInterest(rr);
                                 removeMessage = true;
+                                dispatchAudit.isDuplicate(messageId);
                             }
                         }else {
                             ackContainer.update(entry,tsa);
                         }
                     }
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(destination.getPhysicalName() + " remove: " + messageId);
+                    }
                 }else{
-           
                     if (ackContainer.isEmpty() || isUnreferencedBySubscribers(subscriberMessages,
messageId)) {
                         // no message reference held        
                         removeMessage = true;
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(destination.getPhysicalName() + " remove with no outstanding
reference (dup ack): " + messageId);
+                        }
                     }
                 }
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=760721&r1=760720&r2=760721&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java Tue Mar
31 23:10:04 2009
@@ -239,7 +239,7 @@
 
     private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
         if (debug) {
-            LOG.info("Memory usage change.  from: " + oldPercentUsage + ", to: " + newPercentUsage);
+            LOG.debug("Memory usage change.  from: " + oldPercentUsage + ", to: " + newPercentUsage);
         }    
              
         if (started.get()) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java?rev=760721&r1=760720&r2=760721&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java Tue
Mar 31 23:10:04 2009
@@ -35,13 +35,10 @@
 import junit.framework.TestCase;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.util.LoggingBrokerPlugin;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
@@ -71,7 +68,7 @@
     final long SLEEP_BETWEEN_SEND_MS = 3;
     final int NUM_SENDERS_AND_RECEIVERS = 10;
     final Object brokerLock = new Object();
-        
+         
     BrokerService broker;
     Vector<Throwable> exceptions = new Vector<Throwable>();
 
@@ -83,6 +80,13 @@
         broker = new BrokerService();
         AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
         persistenceFactory.setDataDirectory(dataDirFile);
+        
+        SystemUsage usage = new SystemUsage();
+        MemoryUsage memoryUsage = new MemoryUsage();
+        memoryUsage.setLimit(MESSAGE_LENGTH_BYTES * 200 * NUM_SENDERS_AND_RECEIVERS);
+        usage.setMemoryUsage(memoryUsage);
+        broker.setSystemUsage(usage);
+        
         broker.setPersistenceFactory(persistenceFactory);
 
         broker.addConnector(BROKER_CONNECTOR);        
@@ -242,24 +246,6 @@
         verifyStats(false);
     }
 
-    public void testOrderWithMemeUsageLimit() throws Exception {
-        
-        createBroker(new Configurer() {
-            public void configure(BrokerService broker) throws Exception {
-                SystemUsage usage = new SystemUsage();
-                MemoryUsage memoryUsage = new MemoryUsage();
-                memoryUsage.setLimit(MESSAGE_LENGTH_BYTES * 10 * NUM_SENDERS_AND_RECEIVERS);
-                usage.setMemoryUsage(memoryUsage);
-                broker.setSystemUsage(usage);
-                
-                broker.deleteAllMessages();            
-            }
-        });
-        
-        verifyOrderedMessageReceipt();
-        verifyStats(false);
-    }
-
     // no need to run this unless there are some issues with the others
     public void noProblem_testOrderWithRestartAndVMIndex() throws Exception {
         createBroker(new Configurer() {
@@ -312,24 +298,15 @@
     }
     
     
-    public void x_testTopicOrderWithRestart() throws Exception {
-        plugins[0].setLogAll(true);
-        plugins[0].setLogInternalEvents(false);
-        
-        
+    public void testTopicOrderWithRestart() throws Exception {
         createBroker(new Configurer() {
             public void configure(BrokerService broker) throws Exception {
-                broker.deleteAllMessages();   
-                broker.setPlugins(plugins);
+                broker.deleteAllMessages();
             }
         });
         
         final Timer timer = new Timer();
-        schedualRestartTask(timer, new Configurer() {
-            public void configure(BrokerService broker) throws Exception {
-                broker.setPlugins(plugins);
-            }
-        });
+        schedualRestartTask(timer, null);
         
         try {
             verifyOrderedMessageReceipt(ActiveMQDestination.TOPIC_TYPE);



Mime
View raw message