activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r760075 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/ main/java/org/apache/activemq/store/amq/ main/java/org/apache/activemq/...
Date Mon, 30 Mar 2009 18:04:21 GMT
Author: gtully
Date: Mon Mar 30 18:04:20 2009
New Revision: 760075

URL: http://svn.apache.org/viewvc?rev=760075&view=rev
Log:
fix duplicate detection of messages recovered when space limit is reached and fix cursor cache
reenablement when free space becomes available, AMQ-2149

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java?rev=760075&r1=760074&r2=760075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
Mon Mar 30 18:04:20 2009
@@ -91,7 +91,7 @@
     }
 
     /**
-     * Checks if this message has beeb seen before
+     * Checks if this message has been seen before
      * 
      * @param message
      * @return true if the message is a duplicate

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=760075&r1=760074&r2=760075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Mon Mar 30 18:04:20 2009
@@ -271,11 +271,22 @@
         this.useCache = useCache;
     }
 
-    public synchronized boolean  isDuplicate(MessageId messageId) {
+    public synchronized boolean isDuplicate(MessageId messageId) {
+        boolean unique = recordUniqueId(messageId);
+        rollback(messageId);
+        return !unique;
+    }
+    
+    /**
+     * records a message id and checks if it is a duplicate
+     * @param messageId
+     * @return true if id is unique, false otherwise.
+     */
+    public synchronized boolean recordUniqueId(MessageId messageId) {
         if (!enableAudit || audit==null) {
-            return false;
+            return true;
         }
-        return audit.isDuplicate(messageId);
+        return !audit.isDuplicate(messageId);
     }
     
     public synchronized void rollback(MessageId id) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=760075&r1=760074&r2=760075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Mon Mar 30 18:04:20 2009
@@ -37,7 +37,7 @@
     protected final Destination regionDestination;
     private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message>
();
     private Iterator<Entry<MessageId, Message>> iterator = null;
-    protected boolean cacheEnabled=false;
+    private boolean cacheEnabled=false;
     protected boolean batchResetNeeded = true;
     protected boolean storeHasMessages = false;
     protected int size;
@@ -73,7 +73,7 @@
     
     public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception
{
         boolean recovered = false;
-        if (!isDuplicate(message.getMessageId())) {
+        if (recordUniqueId(message.getMessageId())) {
             if (!cached) {
                 message.setRegionDestination(regionDestination);
                 if( message.getMemoryUsage()==null ) {
@@ -157,6 +157,9 @@
         } else {
             if (cacheEnabled) {
                 cacheEnabled=false;
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName()
+ " disabling cache on size:" + size);
+                }
                 // sync with store on disabling the cache
                 if (lastCachedId != null) {
                     setBatch(lastCachedId);
@@ -176,12 +179,15 @@
 
     public final synchronized void remove() {
         size--;
-        if (size==0 && isStarted() && useCache) {
-            cacheEnabled=true;
-        }
         if (iterator!=null) {
             iterator.remove();
         }
+        if (size==0 && isStarted() && useCache && hasSpace() &&
getStoreSize() == 0) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() +
" enabling cache on last remove");
+            }
+            cacheEnabled=true;
+        }
     }
 
     public final synchronized void remove(MessageReference node) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=760075&r1=760074&r2=760075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
Mon Mar 30 18:04:20 2009
@@ -17,14 +17,11 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.amq.AMQMessageStore;
-import org.apache.activemq.store.kahadaptor.KahaReferenceStore;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java?rev=760075&r1=760074&r2=760075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
Mon Mar 30 18:04:20 2009
@@ -26,5 +26,10 @@
     boolean recoverMessage(Message message) throws Exception;
     boolean recoverMessageReference(MessageId ref) throws Exception;
     boolean hasSpace();
+    /**
+     * check if ref is a duplicate but do not record the reference
+     * @param ref
+     * @return true if ref is a duplicate
+     */
     boolean isDuplicate(MessageId ref);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=760075&r1=760074&r2=760075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
Mon Mar 30 18:04:20 2009
@@ -381,6 +381,9 @@
                     Entry<MessageId, ReferenceData> entry = iterator.next();
                     try {
                         if (referenceStore.addMessageReference(context, entry.getKey(), entry.getValue()))
{
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("adding message ref:" + entry.getKey());
+                            }
                             size++;
                         } else {
                             if (LOG.isDebugEnabled()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=760075&r1=760074&r2=760075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
Mon Mar 30 18:04:20 2009
@@ -119,7 +119,7 @@
                 do {
                     ReferenceRecord msg = messageContainer.getValue(entry);
                     if (msg != null ) {
-                        if ( recoverReference(listener, msg)) {
+                        if (recoverReference(listener, msg)) {
                             count++;
                             lastBatchId = msg.getMessageId();
                         } else if (!listener.isDuplicate(new MessageId(msg.getMessageId())))
{
@@ -180,14 +180,6 @@
             lock.unlock();
         }
     }
-    
-    public void addReferenceFileIdsInUse() {
-        for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
-            .getNext(entry)) {
-            ReferenceRecord msg = (ReferenceRecord)messageContainer.getValue(entry);
-            addInterest(msg);
-        }
-    }
 
     public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException
{
         removeMessage(ack.getLastMessageId());
@@ -274,6 +266,9 @@
         lock.lock();
         try {
             batchEntry = messageContainer.getEntry(startAfter);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("setBatch: " + startAfter);
+            }
         } finally {
             lock.unlock();
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=760075&r1=760074&r2=760075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java Mon Mar
30 18:04:20 2009
@@ -184,10 +184,6 @@
         onLimitChange();
     }
 
-    /*
-     * Sets the minimum number of percentage points the usage has to change
-     * before a UsageListener event is fired by the manager.
-     */
     public int getPercentUsage() {
         synchronized (usageMutex) {
             return percentUsage;
@@ -243,8 +239,9 @@
 
     private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
         if (debug) {
-            LOG.debug("Memory usage change.  from: " + oldPercentUsage + ", to: " + newPercentUsage);
-        }
+            LOG.info("Memory usage change.  from: " + oldPercentUsage + ", to: " + newPercentUsage);
+        }    
+             
         if (started.get()) {
             // Switching from being full to not being full..
             if (oldPercentUsage >= 100 && newPercentUsage < 100) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java?rev=760075&r1=760074&r2=760075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java Mon
Mar 30 18:04:20 2009
@@ -30,17 +30,20 @@
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.Topic;
 
 import junit.framework.TestCase;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.broker.util.LoggingBrokerPlugin;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
@@ -63,16 +66,18 @@
         
     private final String SEQ_NUM_PROPERTY = "seqNum";
 
-    final int MESSAGE_LENGTH_BYTES = 75000;
+    final int MESSAGE_LENGTH_BYTES = 75 * 1024;
     final int MAX_TO_SEND  = 1500;
     final long SLEEP_BETWEEN_SEND_MS = 3;
     final int NUM_SENDERS_AND_RECEIVERS = 10;
     final Object brokerLock = new Object();
-    
+        
     BrokerService broker;
     Vector<Throwable> exceptions = new Vector<Throwable>();
 
     private File dataDirFile;
+    final LoggingBrokerPlugin[] plugins = new LoggingBrokerPlugin[]{new LoggingBrokerPlugin()};
+    
     
     public void createBroker(Configurer configurer) throws Exception {
         broker = new BrokerService();
@@ -112,7 +117,7 @@
 
     private class Receiver implements MessageListener {
 
-        private final String queueName;
+        private final javax.jms.Destination dest;
 
         private final Connection connection;
 
@@ -124,13 +129,17 @@
         
         private String lastId = null;
 
-        public Receiver(String queueName) throws JMSException {
-            this.queueName = queueName;
+        public Receiver(javax.jms.Destination dest) throws JMSException {
+            this.dest = dest;
             connection = new ActiveMQConnectionFactory(BROKER_URL)
                     .createConnection();
+            connection.setClientID(dest.toString());
             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            messageConsumer = session.createConsumer(new ActiveMQQueue(
-                    queueName));
+            if (ActiveMQDestination.transform(dest).isTopic()) {
+                messageConsumer = session.createDurableSubscriber((Topic) dest, dest.toString());
+            } else {
+                messageConsumer = session.createConsumer(dest);
+            }
             messageConsumer.setMessageListener(this);
             connection.start();
         }
@@ -147,22 +156,22 @@
             try {
                 final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
                 if ((seqNum % 500) == 0) {
-                    LOG.info(queueName + " received " + seqNum);
+                    LOG.info(dest + " received " + seqNum);
                 }
                 if (seqNum != nextExpectedSeqNum) {
-                    LOG.warn(queueName + " received " + seqNum
+                    LOG.warn(dest + " received " + seqNum
                             + " in msg: " + message.getJMSMessageID()
                             + " expected "
                             + nextExpectedSeqNum
                             + ", lastId: " + lastId 
                             + ", message:" + message);
-                    fail(queueName + " received " + seqNum + " expected "
+                    fail(dest + " received " + seqNum + " expected "
                             + nextExpectedSeqNum);
                 }
                 ++nextExpectedSeqNum;
                 lastId = message.getJMSMessageID();
             } catch (Throwable e) {
-                LOG.error(queueName + " onMessage error", e);
+                LOG.error(dest + " onMessage error", e);
                 exceptions.add(e);
             }
         }
@@ -171,7 +180,7 @@
 
     private class Sender implements Runnable {
 
-        private final String queueName;
+        private final javax.jms.Destination dest;
 
         private final Connection connection;
 
@@ -181,13 +190,12 @@
 
         private volatile long nextSequenceNumber = 0;
 
-        public Sender(String queueName) throws JMSException {
-            this.queueName = queueName;
+        public Sender(javax.jms.Destination dest) throws JMSException {
+            this.dest = dest;
             connection = new ActiveMQConnectionFactory(BROKER_URL)
                     .createConnection();
             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            messageProducer = session.createProducer(new ActiveMQQueue(
-                    queueName));
+            messageProducer = session.createProducer(dest);
             messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
             connection.start();
         }
@@ -203,14 +211,14 @@
                     ++nextSequenceNumber;
                     messageProducer.send(message);
                 } catch (Exception e) {
-                    LOG.error(queueName + " send error", e);
+                    LOG.error(dest + " send error", e);
                     exceptions.add(e);
                 }
                 if (SLEEP_BETWEEN_SEND_MS > 0) {
                     try {
                         Thread.sleep(SLEEP_BETWEEN_SEND_MS);
                     } catch (InterruptedException e) {
-                        LOG.warn(queueName + " sleep interrupted", e);
+                        LOG.warn(dest + " sleep interrupted", e);
                     }
                 }
             }
@@ -240,7 +248,7 @@
             public void configure(BrokerService broker) throws Exception {
                 SystemUsage usage = new SystemUsage();
                 MemoryUsage memoryUsage = new MemoryUsage();
-                memoryUsage.setLimit(MESSAGE_LENGTH_BYTES * 5 * NUM_SENDERS_AND_RECEIVERS);
+                memoryUsage.setLimit(MESSAGE_LENGTH_BYTES * 10 * NUM_SENDERS_AND_RECEIVERS);
                 usage.setMemoryUsage(memoryUsage);
                 broker.setSystemUsage(usage);
                 
@@ -252,7 +260,8 @@
         verifyStats(false);
     }
 
-    public void testOrderWithRestartAndVMIndex() throws Exception {
+    // no need to run this unless there are some issues with the others
+    public void noProblem_testOrderWithRestartAndVMIndex() throws Exception {
         createBroker(new Configurer() {
             public void configure(BrokerService broker) throws Exception {
                 AMQPersistenceAdapterFactory persistenceFactory =
@@ -288,7 +297,10 @@
         });
         
         final Timer timer = new Timer();
-        schedualRestartTask(timer, null);
+        schedualRestartTask(timer, new Configurer() {
+            public void configure(BrokerService broker) throws Exception {    
+            }
+        });
         
         try {
             verifyOrderedMessageReceipt();
@@ -300,29 +312,27 @@
     }
     
     
-    public void testOrderWithRestartAndNoCache() throws Exception {
+    public void x_testTopicOrderWithRestart() throws Exception {
+        plugins[0].setLogAll(true);
+        plugins[0].setLogInternalEvents(false);
+        
         
-        PolicyEntry noCache = new PolicyEntry();
-        noCache.setUseCache(false);
-        final PolicyMap policyMap = new PolicyMap();
-        policyMap.setDefaultEntry(noCache);
-
         createBroker(new Configurer() {
             public void configure(BrokerService broker) throws Exception {
-                broker.setDestinationPolicy(policyMap);
-                broker.deleteAllMessages();
+                broker.deleteAllMessages();   
+                broker.setPlugins(plugins);
             }
         });
         
         final Timer timer = new Timer();
         schedualRestartTask(timer, new Configurer() {
             public void configure(BrokerService broker) throws Exception {
-                broker.setDestinationPolicy(policyMap);
+                broker.setPlugins(plugins);
             }
         });
         
         try {
-            verifyOrderedMessageReceipt();
+            verifyOrderedMessageReceipt(ActiveMQDestination.TOPIC_TYPE);
         } finally {
             timer.cancel();
         }
@@ -339,6 +349,7 @@
                 AMQPersistenceAdapterFactory persistenceFactory =
                     (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
                 persistenceFactory.setForceRecoverReferenceStore(true);
+                broker.setPlugins(plugins);
                 broker.deleteAllMessages();     
             }
         });
@@ -349,6 +360,7 @@
                 AMQPersistenceAdapterFactory persistenceFactory =
                     (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
                 persistenceFactory.setForceRecoverReferenceStore(true);
+                broker.setPlugins(plugins);
             }
         });
         
@@ -408,19 +420,24 @@
     }
     
     private void verifyOrderedMessageReceipt() throws Exception {
+        verifyOrderedMessageReceipt(ActiveMQDestination.QUEUE_TYPE);
+    }
+    
+    private void verifyOrderedMessageReceipt(byte destinationType) throws Exception {
         
         Vector<Thread> threads = new Vector<Thread>();
         Vector<Receiver> receivers = new Vector<Receiver>();
         
         for (int i = 0; i < NUM_SENDERS_AND_RECEIVERS; ++i) {
-            final String queueName = "test.queue." + i;
-            receivers.add(new Receiver(queueName));
-            Thread thread = new Thread(new Sender(queueName));
+            final javax.jms.Destination destination =
+                    ActiveMQDestination.createDestination("test.dest." + i, destinationType);
+            receivers.add(new Receiver(destination));
+            Thread thread = new Thread(new Sender(destination));
             thread.start();
             threads.add(thread);
         }
         
-        final long expiry = System.currentTimeMillis() + 1000 * 60 * 20;
+        final long expiry = System.currentTimeMillis() + 1000 * 60 * 30;
         while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis()
< expiry) {
             Thread sendThread = threads.firstElement();
             sendThread.join(1000*10);



Mime
View raw message