activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r758678 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/ activemq-core/src/main/java/org/apache/activemq/broker/jmx/ activemq-core/src/main/java/org/apache/activemq/broker/region/ activemq-core/src/main/java/org/apache...
Date Thu, 26 Mar 2009 15:23:59 GMT
Author: gtully
Date: Thu Mar 26 15:22:57 2009
New Revision: 758678

URL: http://svn.apache.org/viewvc?rev=758678&view=rev
Log:
resolve AMQ-2149|https://issues.apache.org/activemq/browse/AMQ-2149 - isses with kaha reference
store and duplicate messages, recovery of duplicaes and recovery with memory limits and session
duplicate acks resulting in out of order message dispatch, more detail in the jira and test
case

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java
    activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Thu Mar 26 15:22:57 2009
@@ -884,7 +884,7 @@
     }
 
     /**
-     * Acknowledge all the messages that have been delivered to the client upto
+     * Acknowledge all the messages that have been delivered to the client up to
      * this point.
      * 
      * @throws JMSException
@@ -1067,7 +1067,7 @@
                         if (LOG.isDebugEnabled()) {
                             LOG.debug(getConsumerId() + " Ignoring Duplicate: " + md.getMessage());
                         }
-                        ackLater(md, MessageAck.STANDARD_ACK_TYPE);
+                        acknowledge(md);
                     }
                 }
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
Thu Mar 26 15:22:57 2009
@@ -212,6 +212,9 @@
                 localTransactionEventListener.beginEvent();
             }
         }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Being:" + transactionId);
+        }
     }
 
     /**
@@ -230,6 +233,10 @@
         
         beforeEnd();
         if (transactionId != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Rollback:" + transactionId);
+            }
+
             TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId,
TransactionInfo.ROLLBACK);
             this.transactionId = null;
             this.connection.asyncSendPacket(info);
@@ -260,6 +267,10 @@
 
         // Only send commit if the transaction was started.
         if (transactionId != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Commit:" + transactionId);
+            }
+
             TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId,
TransactionInfo.COMMIT_ONE_PHASE);
             this.transactionId = null;
             // Notify the listener that the tx was committed back

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
Thu Mar 26 15:22:57 2009
@@ -432,6 +432,10 @@
                 public boolean hasSpace() {
                     return true;
                 }
+                
+                public boolean isDuplicate(MessageId id) {
+                    return false;
+                }
             });
         } catch (Throwable e) {
             LOG.error("Failed to browse messages for Subscription " + view, e);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Thu Mar 26 15:22:57 2009
@@ -18,8 +18,6 @@
 
 import java.io.IOException;
 
-import javax.jms.JMSException;
-
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Thu Mar 26 15:22:57 2009
@@ -21,6 +21,8 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
@@ -69,6 +71,8 @@
     protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
     private boolean slowConsumer;
 
+    private CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
+    
     public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext
context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
         super(broker,context, info);
         this.usageManager=usageManager;
@@ -186,6 +190,15 @@
         // Handle the standard acknowledgment case.
         boolean callDispatchMatched = false;
         Destination destination = null;
+        
+        if (!isSlave()) {
+            while(!okForAckAsDispatchDone.await(100, TimeUnit.MILLISECONDS)) {
+                LOG.warn("Ack before disaptch, waiting for recovery dispatch: " + ack);
+            }
+        }
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("ack:" + ack);
+        }
         synchronized(dispatchLock) {
             if (ack.isStandardAck()) {
             	// First check if the ack matches the dispatched. When using failover this might
@@ -262,7 +275,7 @@
                 // this only happens after a reconnect - get an ack which is not
                 // valid
                 if (!callDispatchMatched) {
-                        LOG.error("Could not correlate acknowledgment with dispatched message:
"
+                    LOG.error("Could not correlate acknowledgment with dispatched message:
"
                                   + ack);
                 }
             } else if (ack.isIndividualAck()) {
@@ -608,6 +621,9 @@
         if (message == null) {
             return false;
         }
+        
+        okForAckAsDispatchDone.countDown();
+        
         // No reentrant lock - Patch needed to IndirectMessageReference on method lock
         if (!isSlave()) {
 
@@ -648,6 +664,9 @@
                 node.getRegionDestination().getDestinationStatistics().getInflight().increment();
             }
         }
+        if (LOG.isTraceEnabled()) {
+            LOG.trace(info.getDestination().getPhysicalName() + " dispatched: " + message.getMessageId());
+        }
         if (info.isDispatchAsync()) {
             try {
                 dispatchPending();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Mar 26 15:22:57 2009
@@ -217,6 +217,10 @@
                     public boolean hasSpace() {
                         return true;
                     }
+                    
+                    public boolean isDuplicate(MessageId id) {
+                        return false;
+                    }
                 });
             }else {
                 int messageCount = store.getMessageCount();
@@ -540,8 +544,7 @@
     public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack,
MessageReference node) throws IOException {
         messageConsumed(context, node);
         if (store != null && node.isPersistent()) {
-            // the original ack may be a ranged ack, but we are trying to delete
-            // a specific
+            // the original ack may be a ranged ack, but we are trying to delete a specific
             // message store here so we need to convert to a non ranged ack.
             if (ack.getMessageCount() > 0) {
                 // Dup the ack
@@ -1542,14 +1545,10 @@
 
     public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
         if (oldPercentUsage > newPercentUsage) {
-            synchronized(messagesWaitingForSpace) {
-                if (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull())
{
-                    try {
-                        this.taskRunner.wakeup();
-                    } catch (InterruptedException e) {
-                        LOG.warn(getName() + " failed to wakeup task runner on usageChange:
" + e);
-                    }
-                }
+            try {
+                this.taskRunner.wakeup();
+            } catch (InterruptedException e) {
+                LOG.warn(getName() + " failed to wakeup task runner on usageChange: " + e);
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Thu Mar 26 15:22:57 2009
@@ -238,6 +238,10 @@
                     public boolean hasSpace() {
                         return true;
                     }
+                    
+                    public boolean isDuplicate(MessageId id) {
+                        return false;
+                    }
                 });
             }
 
@@ -494,6 +498,10 @@
                     public boolean hasSpace() {
                         return true;
                     }
+                    
+                    public boolean isDuplicate(MessageId id) {
+                        return false;
+                    }
                 });
                 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
                 if (msgs != null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Thu Mar 26 15:22:57 2009
@@ -271,7 +271,7 @@
         this.useCache = useCache;
     }
 
-    protected synchronized boolean  isDuplicate(MessageId messageId) {
+    public synchronized boolean  isDuplicate(MessageId messageId) {
         if (!enableAudit || audit==null) {
             return false;
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Thu Mar 26 15:22:57 2009
@@ -71,7 +71,8 @@
         return recoverMessage(message,false);
     }
     
-    public synchronized boolean recoverMessage(Message message, boolean cached)throws Exception
{
+    public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception
{
+        boolean recovered = false;
         if (!isDuplicate(message.getMessageId())) {
             if (!cached) {
                 message.setRegionDestination(regionDestination);
@@ -82,13 +83,14 @@
             message.incrementReferenceCount();
             batchList.put(message.getMessageId(), message);
             clearIterator(true);
+            recovered = true;
         } else {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Ignoring batched duplicated from store: " + message);
+                LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() +
" cursor got duplicate: " + message);
             }
             storeHasMessages = true;
         }
-        return true;
+        return recovered;
     }
     
     public final void reset() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Thu Mar 26 15:22:57 2009
@@ -18,7 +18,6 @@
 
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
Thu Mar 26 15:22:57 2009
@@ -26,4 +26,5 @@
     boolean recoverMessage(Message message) throws Exception;
     boolean recoverMessageReference(MessageId ref) throws Exception;
     boolean hasSpace();
+    boolean isDuplicate(MessageId ref);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
Thu Mar 26 15:22:57 2009
@@ -67,8 +67,9 @@
 
     /**
      * Adds a message reference to the message store
+     * @return true if reference was added, false if it is a duplicate and not added
      */
-    void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData
data) throws IOException;
+    boolean addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData
data) throws IOException;
 
     /**
      * Looks up a message using either the String messageID or the

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
Thu Mar 26 15:22:57 2009
@@ -380,13 +380,19 @@
                 while (iterator.hasNext()) {
                     Entry<MessageId, ReferenceData> entry = iterator.next();
                     try {
-                        referenceStore.addMessageReference(context, entry.getKey(), entry.getValue());
+                        if (referenceStore.addMessageReference(context, entry.getKey(), entry.getValue()))
{
+                            size++;
+                        } else {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("not adding duplicate reference: " + entry.getKey()
+ ", " + entry.getValue());
+                            }
+                        }
                         AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this,
entry
                                 .getValue().getFileId());
                     } catch (Throwable e) {
                         LOG.warn("Message could not be added to long term store: " + e.getMessage(),
e);
                     }
-                    size++;
+                    
                     // Commit the batch if it's getting too big
                     if (size >= maxCheckpointMessageAddSize) {
                         persitanceAdapter.commitTransaction(context);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
Thu Mar 26 15:22:57 2009
@@ -40,6 +40,10 @@
         return listener.hasSpace();
     }
 
+    public boolean isDuplicate(MessageId id) {
+        return listener.isDuplicate(id);
+    }
+    
     public boolean recoverMessage(Message message) throws Exception {
         if (listener.hasSpace()) {
             listener.recoverMessage(message);
@@ -55,7 +59,8 @@
         if (message != null) {
             return recoverMessage(message);
         } else {
-            throw new IllegalStateException("Message id " + ref + " could not be recovered
from the data store - already dispatched");
+            throw new IllegalStateException("Message id " + ref + " could not be recovered
from the data store for: " + store.getDestination().getQualifiedName() 
+                    + " - already dispatched");
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
Thu Mar 26 15:22:57 2009
@@ -29,9 +29,9 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.ReferenceStore;
-import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -122,12 +122,18 @@
                         if ( recoverReference(listener, msg)) {
                             count++;
                             lastBatchId = msg.getMessageId();
-                        } else {
+                        } else if (!listener.isDuplicate(new MessageId(msg.getMessageId())))
{
                             if (LOG.isDebugEnabled()) {
-                                LOG.debug(destination.getQualifiedName() + " did not recover:"
+ msg.getMessageId());
+                                LOG.debug(destination.getQualifiedName() + " did not recover
(will retry) message: " + msg.getMessageId());
                             }
+                            // give usage limits a chance to reclaim
                             break;
-                        }
+                        } else {
+                            // skip duplicate and continue
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug(destination.getQualifiedName() + " skipping duplicate,
" + msg.getMessageId());
+                            }
+                        }                        
                     } else {
                         lastBatchId = null;
                     }
@@ -140,16 +146,26 @@
         }
     }
 
-    public void addMessageReference(ConnectionContext context, MessageId messageId,
+    public boolean addMessageReference(ConnectionContext context, MessageId messageId,
                                                  ReferenceData data) throws IOException {
+        
+        boolean uniqueueReferenceAdded = false;
         lock.lock();
         try {
-            ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
-            messageContainer.put(messageId, record);
-            addInterest(record);
-        }finally {
+            if (!messageContainer.containsKey(messageId)) {
+                ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
+                messageContainer.put(messageId, record);
+                uniqueueReferenceAdded = true;
+                addInterest(record);
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(destination.getPhysicalName() + " ignoring duplicated (add)
message reference:"  + messageId);
+                }
+            }
+        } finally {
             lock.unlock();
         }
+        return uniqueueReferenceAdded;
     }
 
     public ReferenceData getMessageReference(MessageId identity) throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Thu Mar 26 15:22:57 2009
@@ -75,7 +75,7 @@
         throw new RuntimeException("Use addMessageReference instead");
     }
 
-    public  void addMessageReference(final ConnectionContext context, final MessageId messageId,
+    public  boolean addMessageReference(final ConnectionContext context, final MessageId
messageId,
                                     final ReferenceData data) {
         lock.lock();
         try {
@@ -100,6 +100,7 @@
         }finally {
             lock.unlock();
         }
+        return true;
     }
 
     public ReferenceData getMessageReference(final MessageId identity) throws IOException
{

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Thu Mar 26 15:22:57 2009
@@ -486,7 +486,9 @@
                         return;
 
                     } catch (IOException e) {
-                        LOG.debug("Send oneway attempt: " + i + " failed.");
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Send oneway attempt: " + i + " failed for command:"
+ command);   
+                        }
                         handleTransportFailure(e);
                     }
                 }
@@ -622,6 +624,9 @@
         }
         for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();)
{
             Command command = iter2.next();
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("restore, replay: " + command);
+            }
             t.oneway(command);
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java Thu Mar
26 15:22:57 2009
@@ -268,7 +268,11 @@
                 }
             
             };
-            getExecutor().execute(listenerNotifier);
+            if (started.get()) {
+                getExecutor().execute(listenerNotifier);
+            } else {
+                LOG.warn("not notifying usage change to listeners on shutdown");
+            }
         }
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java Thu
Mar 26 15:22:57 2009
@@ -22,8 +22,6 @@
 import java.util.TimerTask;
 import java.util.Vector;
 
-import junit.framework.TestCase;
-
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
@@ -33,11 +31,15 @@
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
+import junit.framework.TestCase;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
 import org.apache.activemq.usage.MemoryUsage;
@@ -53,7 +55,7 @@
 
     private static final Log LOG = LogFactory.getLog(AMQ2149Test.class);
 
-    private static final long BROKER_STOP_PERIOD = 15 * 1000;
+    private static final long BROKER_STOP_PERIOD = 20 * 1000;
 
     private static final String BROKER_CONNECTOR = "tcp://localhost:61617";
     private static final String BROKER_URL = "failover:("+ BROKER_CONNECTOR
@@ -62,7 +64,7 @@
     private final String SEQ_NUM_PROPERTY = "seqNum";
 
     final int MESSAGE_LENGTH_BYTES = 75000;
-    final int MAX_TO_SEND  = 2000;
+    final int MAX_TO_SEND  = 1500;
     final long SLEEP_BETWEEN_SEND_MS = 3;
     final int NUM_SENDERS_AND_RECEIVERS = 10;
     final Object brokerLock = new Object();
@@ -144,7 +146,7 @@
         public void onMessage(Message message) {
             try {
                 final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
-                if ((seqNum % 100) == 0) {
+                if ((seqNum % 500) == 0) {
                     LOG.info(queueName + " received " + seqNum);
                 }
                 if (seqNum != nextExpectedSeqNum) {
@@ -192,7 +194,7 @@
 
         public void run() {
             final String longString = buildLongString();
-            while (nextSequenceNumber <= MAX_TO_SEND) {
+            while (nextSequenceNumber < MAX_TO_SEND) {
                 try {
                     final Message message = session
                             .createTextMessage(longString);
@@ -219,7 +221,20 @@
         }
     }
 
-    public void x_testOrderWithMemeUsageLimit() throws Exception {
+    // no need to run this unless there are some issues with the others
+    public void vanilaVerify_testOrder() throws Exception {
+        
+        createBroker(new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+               broker.deleteAllMessages();            
+            }
+        });
+        
+        verifyOrderedMessageReceipt();
+        verifyStats(false);
+    }
+
+    public void testOrderWithMemeUsageLimit() throws Exception {
         
         createBroker(new Configurer() {
             public void configure(BrokerService broker) throws Exception {
@@ -234,9 +249,10 @@
         });
         
         verifyOrderedMessageReceipt();
+        verifyStats(false);
     }
 
-    public void testOrderWithRestartVMIndex() throws Exception {
+    public void testOrderWithRestartAndVMIndex() throws Exception {
         createBroker(new Configurer() {
             public void configure(BrokerService broker) throws Exception {
                 AMQPersistenceAdapterFactory persistenceFactory =
@@ -260,10 +276,11 @@
         } finally {
             timer.cancel();
         }
+        verifyStats(true);
     }
 
 
-    public void x_testOrderWithRestart() throws Exception {
+    public void testOrderWithRestart() throws Exception {
         createBroker(new Configurer() {
             public void configure(BrokerService broker) throws Exception {
                 broker.deleteAllMessages();     
@@ -278,10 +295,45 @@
         } finally {
             timer.cancel();
         }
+        
+        verifyStats(true);
     }
     
+    
+    public void testOrderWithRestartAndNoCache() throws Exception {
+        
+        PolicyEntry noCache = new PolicyEntry();
+        noCache.setUseCache(false);
+        final PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(noCache);
+
+        createBroker(new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+                broker.setDestinationPolicy(policyMap);
+                broker.deleteAllMessages();
+            }
+        });
+        
+        final Timer timer = new Timer();
+        schedualRestartTask(timer, new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+                broker.setDestinationPolicy(policyMap);
+            }
+        });
+        
+        try {
+            verifyOrderedMessageReceipt();
+        } finally {
+            timer.cancel();
+        }
+        
+        verifyStats(true);
+    }
 
-    public void x_testOrderWithRestartWithForceRecover() throws Exception {
+
+    // no need to run this unless there are issues with the other restart tests
+  
+    public void eaiserToRepoduce_testOrderWithRestartWithForceRecover() throws Exception
{
         createBroker(new Configurer() {
             public void configure(BrokerService broker) throws Exception {
                 AMQPersistenceAdapterFactory persistenceFactory =
@@ -297,11 +349,6 @@
                 AMQPersistenceAdapterFactory persistenceFactory =
                     (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
                 persistenceFactory.setForceRecoverReferenceStore(true);
-//                PolicyEntry auditDepthPolicy = new PolicyEntry();
-//                auditDepthPolicy.setMaxAuditDepth(2000);
-//                PolicyMap policyMap = new PolicyMap();
-//                policyMap.setDefaultEntry(auditDepthPolicy);
-//                broker.setDestinationPolicy(policyMap);
             }
         });
         
@@ -310,6 +357,23 @@
         } finally {
             timer.cancel();
         }
+        
+        verifyStats(true);
+    }
+
+    private void verifyStats(boolean brokerRestarts) throws Exception {
+        RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
+        
+        for (Destination dest : regionBroker.getQueueRegion().getDestinationMap().values())
{
+            DestinationStatistics stats = dest.getDestinationStatistics();
+            if (brokerRestarts) {
+                assertTrue("qneue/dequeue match for: " + dest.getName(),
+                        stats.getEnqueues().getCount() <= stats.getDequeues().getCount());
+            } else {
+                assertEquals("qneue/dequeue match for: " + dest.getName(),
+                        stats.getEnqueues().getCount(), stats.getDequeues().getCount());
  
+            }
+        }
     }
 
     private void schedualRestartTask(final Timer timer, final Configurer configurer) {
@@ -319,6 +383,7 @@
                     LOG.info("stopping broker..");
                     try {
                         broker.stop();
+                        broker.waitUntilStopped();
                     } catch (Exception e) {
                         LOG.error("ex on broker stop", e);
                         exceptions.add(e);
@@ -355,7 +420,7 @@
             threads.add(thread);
         }
         
-        final long expiry = System.currentTimeMillis() + 1000 * 60 * 10;
+        final long expiry = System.currentTimeMillis() + 1000 * 60 * 20;
         while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis()
< expiry) {
             Thread sendThread = threads.firstElement();
             sendThread.join(1000*10);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java
Thu Mar 26 15:22:57 2009
@@ -24,10 +24,14 @@
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.activemq.kaha.impl.container.BaseContainerImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import junit.framework.TestCase;
 
 public class MapContainerTest extends TestCase {
-
+    private static final Log LOG = LogFactory.getLog(MapContainerTest.class);
     protected static final int COUNT = 10;
 
     protected String name = "test";
@@ -180,6 +184,71 @@
         assertTrue(container.isEmpty());
     }
 
+    
+    public void testDuplicatesOk() throws Exception {
+        StoreEntry first, entry; 
+        
+        container.put("M1", "DD");
+        first = container.getFirst();
+        LOG.info("First=" + first);
+        assertEquals(-1, first.getNextItem());
+        
+        // add duplicate
+        String old = container.put("M1", "DD");
+        assertNotNull(old);
+        assertEquals(1, container.size());
+        
+        entry = container.getFirst();
+        LOG.info("New First=" + entry);
+        assertEquals(-1, entry.getNextItem());
+
+        assertEquals(first, entry);
+        
+        container.remove("M1");
+        
+        entry = container.getFirst();
+        assertNull(entry);
+    }
+
+    
+    public void testDuplicatesFreeListShared() throws Exception {
+        StoreEntry batchEntry; 
+        
+        MapContainer other = store.getMapContainer(getName()+"2", "test", true);
+        other.load();
+        other.put("M1", "DD");
+             
+        container.put("M1", "DD");
+        batchEntry = container.getFirst();
+        LOG.info("First=" + batchEntry);
+        assertEquals(-1, batchEntry.getNextItem());
+        
+        // have something on free list before duplicate
+        other.remove("M1");
+        
+        // add duplicate
+        String old = container.put("M1", "DD");
+        assertNotNull(old);
+        assertEquals(1, container.size());
+
+        // entry now on free list on its own
+        batchEntry = container.refresh(batchEntry);
+        assertEquals(-1, batchEntry.getNextItem());
+        LOG.info("refreshed=" + batchEntry);
+        
+        // ack
+        container.remove("M1");   
+        
+        //container is valid  (empty)
+        assertNull(container.getFirst());
+
+        // batchEntry now has next as there is another on the free list
+        batchEntry = container.refresh(batchEntry);
+        LOG.info("refreshed=" + batchEntry);
+        
+        assertTrue(batchEntry.getNextItem() != -1);        
+    }
+
     protected Store getStore() throws IOException {
         return StoreFactory.open(name, "rw");
     }
@@ -188,7 +257,7 @@
         super.setUp();
         name = System.getProperty("basedir", ".") + "/target/activemq-data/map-container.db";
         store = getStore();
-        container = store.getMapContainer("test", "test", true);
+        container = store.getMapContainer(getName(), "test", true);
         container.load();
         testMap = new HashMap<String, String>();
         for (int i = 0; i < COUNT; i++) {

Modified: activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStore.java?rev=758678&r1=758677&r2=758678&view=diff
==============================================================================
--- activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStore.java
(original)
+++ activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStore.java
Thu Mar 26 15:22:57 2009
@@ -66,7 +66,7 @@
         throw new RuntimeException("Use addMessageReference instead");
     }
 
-    public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData
data) throws IOException {
+    public boolean addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData
data) throws IOException {
         EntityManager manager = adapter.beginEntityManager(context);
         try {
 
@@ -85,6 +85,7 @@
             throw IOExceptionSupport.create(e);
         }
         adapter.commitEntityManager(context, manager);
+        return true;
     }
 
     public ReferenceData getMessageReference(MessageId identity) throws IOException {



Mime
View raw message