activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1064660 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/bugs/
Date Fri, 28 Jan 2011 13:25:22 GMT
Author: gtully
Date: Fri Jan 28 13:25:22 2011
New Revision: 1064660

URL: http://svn.apache.org/viewvc?rev=1064660&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3167 - possible skipped Queue messages in memory
limited configuration with fast consumers
resolve off by one error in messageOrderIndex setBatch

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1064660&r1=1064659&r2=1064660&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Fri Jan 28 13:25:22 2011
@@ -168,27 +168,30 @@ public abstract class AbstractStoreCurso
     
     
     public final synchronized void addMessageLast(MessageReference node) throws Exception
{
-        if (!cacheEnabled && size==0 && isStarted() && useCache &&
hasSpace()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() +
" enabling cache on empty add");
+        if (hasSpace()) {
+            if (!cacheEnabled && size==0 && isStarted() && useCache)
{
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
+                            + " enabling cache for empty store " + node.getMessageId());
+                }
+                cacheEnabled=true;
             }
-            cacheEnabled=true;
-        }
-        if (cacheEnabled && hasSpace()) {
-            recoverMessage(node.getMessage(),true);
-            lastCachedId = node.getMessageId();
-        } else {
             if (cacheEnabled) {
-                cacheEnabled=false;
+                recoverMessage(node.getMessage(),true);
+                lastCachedId = node.getMessageId();
+            }
+        } else if (cacheEnabled) {
+            cacheEnabled=false;
+            // sync with store on disabling the cache
+            if (lastCachedId != null) {
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
+ " disabling cache on size:" + size
-                            + ", lastCachedIdSeq: " + (lastCachedId == null ? -1 : lastCachedId.getBrokerSequenceId())
-                            + " current node seqId: " + node.getMessageId().getBrokerSequenceId());
-                }
-                // sync with store on disabling the cache
-                if (lastCachedId != null) {
-                    setBatch(lastCachedId);
+                    LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
+                            + " disabling cache on size:" + size
+                            + ", lastCachedId: " + lastCachedId
+                            + " current node Id: " + node.getMessageId());
                 }
+                setBatch(lastCachedId);
+                lastCachedId = null;
             }
         }
         this.storeHasMessages = true;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1064660&r1=1064659&r2=1064660&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Fri Jan 28 13:25:22 2011
@@ -487,15 +487,13 @@ public class KahaDBStore extends Message
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Entry<Long, MessageKeys> entry = null;
                         int counter = 0;
-                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
iterator
-                                .hasNext()
-                                && listener.hasSpace();) {
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
+                             listener.hasSpace() && iterator.hasNext(); ) {
                             entry = iterator.next();
                             Message msg = loadMessage(entry.getValue().location);
-                            //System.err.println("RECOVER " + msg.getMessageId().getProducerSequenceId());
                             listener.recoverMessage(msg);
                             counter++;
-                            if (counter >= maxReturned || listener.hasSpace() == false)
{
+                            if (counter >= maxReturned) {
                                 break;
                             }
                         }
@@ -531,7 +529,7 @@ public class KahaDBStore extends Message
                 // operations... but for now we must
                 // externally synchronize...
                
-                indexLock.readLock().lock();
+                indexLock.writeLock().lock();
                 try {
                         pageFile.tx().execute(new Transaction.Closure<IOException>()
{
                         public void execute(Transaction tx) throws IOException {
@@ -543,7 +541,7 @@ public class KahaDBStore extends Message
                         }
                     });
                 }finally {
-                    indexLock.readLock().unlock();
+                    indexLock.writeLock().unlock();
                 }
                 
             } finally {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1064660&r1=1064659&r2=1064660&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Fri Jan 28 13:25:22 2011
@@ -2136,18 +2136,18 @@ public class MessageDatabase extends Ser
             if (sequence != null) {
                 Long nextPosition = new Long(sequence.longValue() + 1);
                 if (defaultPriorityIndex.containsKey(tx, sequence)) {
-                    lastDefaultKey = nextPosition;
+                    lastDefaultKey = sequence;
                     cursor.defaultCursorPosition = nextPosition.longValue();
                 } else if (highPriorityIndex != null) {
                     if (highPriorityIndex.containsKey(tx, sequence)) {
-                        lastHighKey = nextPosition;
+                        lastHighKey = sequence;
                         cursor.highPriorityCursorPosition = nextPosition.longValue();
                     } else if (lowPriorityIndex.containsKey(tx, sequence)) {
-                        lastLowKey = nextPosition;
+                        lastLowKey = sequence;
                         cursor.lowPriorityCursorPosition = nextPosition.longValue();
                     }
                 } else {
-                    lastDefaultKey = nextPosition;
+                    lastDefaultKey = sequence;
                     cursor.defaultCursorPosition = nextPosition.longValue();
                 }
             }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java?rev=1064660&r1=1064659&r2=1064660&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java Fri
Jan 28 13:25:22 2011
@@ -18,6 +18,8 @@ package org.apache.activemq.bugs;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Vector;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -43,8 +45,14 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 public class AMQ2413Test extends CombinationTestSupport implements MessageListener {
+    private static final Log LOG = LogFactory.getLog(AMQ2413Test.class);
     BrokerService broker;
     private ActiveMQConnectionFactory factory;
 
@@ -53,6 +61,7 @@ public class AMQ2413Test extends Combina
     private static final int RECEIVER_THINK_TIME = 1;
     private static final int CONSUMER_COUNT = 1;
     private static final int PRODUCER_COUNT = 50;
+    private static final int TO_SEND = SEND_COUNT / PRODUCER_COUNT;
 
     public int deliveryMode = DeliveryMode.NON_PERSISTENT;
     public int ackMode = Session.DUPS_OK_ACKNOWLEDGE;
@@ -75,6 +84,9 @@ public class AMQ2413Test extends Combina
         broker = new BrokerService();
         broker.setDataDirectory("target" + File.separator + "test-data" + File.separator
+ "AMQ2401Test");
         broker.setDeleteAllMessagesOnStartup(true);
+
+        KahaDBPersistenceAdapter kahaDb = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+        kahaDb.setConcurrentStoreAndDispatchQueues(false);
         broker.addConnector("tcp://0.0.0.0:2401");
         PolicyMap policies = new PolicyMap();
         PolicyEntry entry = new PolicyEntry();
@@ -150,8 +162,9 @@ public class AMQ2413Test extends Combina
     public void onMessage(Message message) {
         receivedMessages.release();
         if (count.incrementAndGet() % 100 == 0) {
-            System.out.println("Received message " + count);
+            LOG.info("Received message " + count);
         }
+        track(message);
         if (RECEIVER_THINK_TIME > 0) {
             try {
                 Thread.currentThread().sleep(RECEIVER_THINK_TIME);
@@ -162,6 +175,26 @@ public class AMQ2413Test extends Combina
 
     }
 
+    HashMap<ProducerId, boolean[]> tracker = new HashMap<ProducerId, boolean[]>();
+    private synchronized void track(Message message) {
+        try {
+            MessageId id = new MessageId(message.getJMSMessageID());
+            ProducerId pid = id.getProducerId();
+            int seq = (int)id.getProducerSequenceId();
+            boolean[] ids = tracker.get(pid);
+            if (ids == null) {
+                ids = new boolean[TO_SEND + 1];
+                ids[seq] = true;
+                tracker.put(pid, ids);
+            } else {
+                assertTrue("not already received: " + id, !ids[seq]);
+                ids[seq] = true;
+            }
+        } catch (Exception e) {
+            LOG.error(e);
+        }
+    }
+
     /**
      * @throws InterruptedException
      * @throws TimeoutException
@@ -172,6 +205,7 @@ public class AMQ2413Test extends Combina
             while (count.get() < SEND_COUNT) {
                 if (!receivedMessages.tryAcquire(HANG_THRESHOLD, TimeUnit.SECONDS)) {
                     if (count.get() == SEND_COUNT) break;
+                    verifyTracking();
                     throw new TimeoutException("@count=" + count.get() + " Message not received
for more than " + HANG_THRESHOLD + " seconds");
                 }
             }
@@ -180,6 +214,19 @@ public class AMQ2413Test extends Combina
         }
     }
 
+    private void verifyTracking() {
+        Vector<MessageId> missing = new Vector<MessageId>();
+        for (ProducerId pid : tracker.keySet()) {
+            boolean[] ids = tracker.get(pid);
+            for (int i=1; i<TO_SEND + 1; i++) {
+                if (!ids[i]) {
+                    missing.add(new MessageId(pid, i));
+                }
+            }
+        }
+        assertTrue("No missing messages: " + missing, missing.isEmpty());
+    }
+
     private interface Service {
         public void start() throws Exception;
 
@@ -210,12 +257,13 @@ public class AMQ2413Test extends Combina
 
         public void run() {
 
-            int count = SEND_COUNT / PRODUCER_COUNT;
-            for (int i = 1; i <= count; i++) {
+
+            int i = 1;
+            for (; i <= TO_SEND; i++) {
                 try {
 
                     if (+i % 100 == 0) {
-                        System.out.println(thread.currentThread().getName() + " Sending message
" + i);
+                        LOG.info(thread.currentThread().getName() + " Sending message " +
i);
                     }
                     message = session.createBytesMessage();
                     message.writeBytes(new byte[1024]);
@@ -226,6 +274,7 @@ public class AMQ2413Test extends Combina
                     break;
                 }
             }
+            LOG.info(thread.currentThread().getName() + " Sent: " + (i-1));
         }
 
         public void close() {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java?rev=1064660&r1=1064659&r2=1064660&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java Fri
Jan 28 13:25:22 2011
@@ -67,6 +67,7 @@ public class AMQ3145Test {
         broker.setUseJmx(true);
         broker.addConnector("tcp://localhost:0");
         broker.start();
+        broker.waitUntilStarted();
         factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri().toString());
         connection = factory.createConnection();
         connection.start();



Mime
View raw message