activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1328413 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ test/java/org/apache/activemq/usecases/
Date Fri, 20 Apr 2012 15:36:10 GMT
Author: gtully
Date: Fri Apr 20 15:36:10 2012
New Revision: 1328413

URL: http://svn.apache.org/viewvc?rev=1328413&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3805 - further resolution, dispatch and activate
could clash over the audit resulting in skipped messages in the batch, the end result being
our of order dispatch. Additional test and fix up of use of cache and sync between durable
sub and prefetch sub

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1328413&r1=1328412&r2=1328413&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Fri Apr 20 15:36:10 2012
@@ -167,40 +167,37 @@ public class DurableTopicSubscription ex
         active.set(false);
         offlineTimestamp.set(System.currentTimeMillis());
         this.usageManager.getMemoryUsage().removeUsageListener(this);
-        synchronized (pending) {
+        synchronized (pendingLock) {
             pending.stop();
-        }
-        for (Iterator<Destination> iter = durableDestinations.values().iterator();
iter.hasNext();) {
-            Topic topic = (Topic)iter.next();
-            if (!keepDurableSubsActive) {
-                topic.deactivate(context, this);
-            } else {
-                topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
-            }
-        }
 
-        for (final MessageReference node : dispatched) {
-            // Mark the dispatched messages as redelivered for next time.
-            Integer count = redeliveredMessages.get(node.getMessageId());
-            if (count != null) {
-                redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue()
+ 1));
-            } else {
-                redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
-            }
-            if (keepDurableSubsActive && pending.isTransient()) {
-                synchronized (pending) {
-                    pending.addMessageFirst(node);
-                    pending.rollback(node.getMessageId());
+            synchronized (dispatchLock) {
+                for (Iterator<Destination> iter = durableDestinations.values().iterator();
iter.hasNext();) {
+                    Topic topic = (Topic)iter.next();
+                    if (!keepDurableSubsActive) {
+                        topic.deactivate(context, this);
+                    } else {
+                        topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
+                    }
                 }
-            } else {
-                node.decrementReferenceCount();
+
+                for (final MessageReference node : dispatched) {
+                    // Mark the dispatched messages as redelivered for next time.
+                    Integer count = redeliveredMessages.get(node.getMessageId());
+                    if (count != null) {
+                        redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue()
+ 1));
+                    } else {
+                        redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
+                    }
+                    if (keepDurableSubsActive && pending.isTransient()) {
+                        pending.addMessageFirst(node);
+                        pending.rollback(node.getMessageId());
+                    } else {
+                        node.decrementReferenceCount();
+                    }
+                }
+                dispatched.clear();
             }
-        }
-        synchronized(dispatched) {
-            dispatched.clear();
-        }
-        if (!keepDurableSubsActive && pending.isTransient()) {
-            synchronized (pending) {
+            if (!keepDurableSubsActive && pending.isTransient()) {
                 try {
                     pending.reset();
                     while (pending.hasNext()) {
@@ -286,7 +283,7 @@ public class DurableTopicSubscription ex
      * Release any references that we are holding.
      */
     public void destroy() {
-        synchronized (pending) {
+        synchronized (pendingLock) {
             try {
 
                 pending.reset();
@@ -300,7 +297,7 @@ public class DurableTopicSubscription ex
                 pending.clear();
             }
         }
-        synchronized(dispatched) {
+        synchronized  (dispatchLock) {
             for (MessageReference node : dispatched) {
                 node.decrementReferenceCount();
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1328413&r1=1328412&r2=1328413&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Fri Apr 20 15:36:10 2012
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -57,7 +56,7 @@ public abstract class PrefetchSubscripti
     protected final Scheduler scheduler;
 
     protected PendingMessageCursor pending;
-    protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
+    protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
     protected final AtomicInteger prefetchExtension = new AtomicInteger();
     protected boolean usePrefetchExtension = true;
     protected long enqueueCounter;
@@ -67,7 +66,7 @@ public abstract class PrefetchSubscripti
     private int maxAuditDepth=2048;
     protected final SystemUsage usageManager;
     protected final Object pendingLock = new Object();
-    private final Object dispatchLock = new Object();
+    protected final Object dispatchLock = new Object();
     private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
 
     public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext
context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
@@ -445,7 +444,7 @@ public abstract class PrefetchSubscripti
 
     /**
      * Checks an ack versus the contents of the dispatched list.
-     *
+     *  called with dispatchLock held
      * @param ack
      * @throws JMSException if it does not match
      */
@@ -658,6 +657,7 @@ public abstract class PrefetchSubscripti
         pending.setMaxBatchSize(numberToDispatch);
     }
 
+    // called with dispatchLock held
     protected boolean dispatch(final MessageReference node) throws IOException {
         final Message message = node.getMessage();
         if (message == null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1328413&r1=1328412&r2=1328413&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Fri Apr 20 15:36:10 2012
@@ -172,6 +172,7 @@ public abstract class AbstractStoreCurso
     
     
     public final synchronized void addMessageLast(MessageReference node) throws Exception
{
+        boolean disableCache = false;
         if (hasSpace()) {
             if (!isCacheEnabled() && size==0 && isStarted() && useCache)
{
                 if (LOG.isTraceEnabled()) {
@@ -180,10 +181,19 @@ public abstract class AbstractStoreCurso
                 setCacheEnabled(true);
             }
             if (isCacheEnabled()) {
-                recoverMessage(node.getMessage(),true);
-                lastCachedId = node.getMessageId();
+                if (recoverMessage(node.getMessage(),true)) {
+                    lastCachedId = node.getMessageId();
+                } else {
+                    // failed to recover, possible duplicate from concurrent dispatchPending,
+                    // lets not recover further in case of out of order
+                    disableCache = true;
+                }
             }
-        } else if (isCacheEnabled()) {
+        } else {
+            disableCache = true;
+        }
+
+        if (disableCache && isCacheEnabled()) {
             setCacheEnabled(false);
             // sync with store on disabling the cache
             if (lastCachedId != null) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest.java?rev=1328413&r1=1328412&r2=1328413&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest.java
Fri Apr 20 15:36:10 2012
@@ -17,6 +17,7 @@
 package org.apache.activemq.usecases;
 
 import java.io.File;
+import java.net.URL;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -91,7 +92,8 @@ public class DurableSubProcessConcurrent
     static final Vector<Throwable> exceptions = new Vector<Throwable>();
 
     // long form of test that found https://issues.apache.org/jira/browse/AMQ-3805
-    @Ignore ("short version in org.apache.activemq.usecases.DurableSubscriptionOfflineTest.testNoDuplicateOnConcurrentSendTranCommitAndActivate")
+    @Ignore ("short version in org.apache.activemq.usecases.DurableSubscriptionOfflineTest.testNoDuplicateOnConcurrentSendTranCommitAndActivate"
+     + " and org.apache.activemq.usecases.DurableSubscriptionOfflineTest.testOrderOnActivateDeactivate")
     @Test
     public void testProcess() {
         try {
@@ -125,12 +127,16 @@ public class DurableSubProcessConcurrent
         //allow the clients to unsubscribe before finishing
         clientManager.setEnd(true);
         try {
-			Thread.sleep(600000);
+			Thread.sleep(60 * 1000);
 		} catch (InterruptedException e) {
 			 exit("ProcessTest.testProcess failed.", e);
 		}
-        
-        
+
+        server.done = true;
+
+        try {
+            server.join(60*1000);
+        } catch (Exception ignored) {}
         processLock.writeLock().lock();
         assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
         LOG.info("DONE.");
@@ -177,7 +183,7 @@ public class DurableSubProcessConcurrent
         int transRover = 0;
         int messageRover = 0;
         public volatile int committingTransaction = -1;        
-
+        public boolean  done = false;
         public Server() {
             super("Server");
             setPriority(Thread.MIN_PRIORITY);
@@ -187,7 +193,7 @@ public class DurableSubProcessConcurrent
         @Override
         public void run() {
             try {
-                while (true) {
+                while (!done) {
 
                 	Thread.sleep(1000);
                 	

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1328413&r1=1328412&r2=1328413&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Fri Apr 20 15:36:10 2012
@@ -102,6 +102,7 @@ public class DurableSubscriptionOfflineT
         broker.getManagementContext().setCreateConnector(false);
         broker.setAdvisorySupport(false);
         broker.setKeepDurableSubsActive(keepDurableSubsActive);
+        broker.addConnector("tcp://0.0.0.0:0");
 
         if (usePrioritySupport) {
             PolicyEntry policy = new PolicyEntry();
@@ -1056,6 +1057,119 @@ public class DurableSubscriptionOfflineT
         assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
     }
 
+    public void testOrderOnActivateDeactivate() throws Exception {
+        for (int i=0;i<10;i++) {
+            LOG.info("Iteration: " + i);
+            doTestOrderOnActivateDeactivate();
+            broker.stop();
+            createBroker(true /*deleteAllMessages*/);
+        }
+    }
+
+    public void doTestOrderOnActivateDeactivate() throws Exception {
+        final int messageCount = 1000;
+        Connection con = null;
+        Session session = null;
+        final int numConsumers = 4;
+        for (int i = 0; i <= numConsumers; i++) {
+            con = createConnection("cli" + i);
+            session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            session.createDurableSubscriber(topic, "SubsId", null, true);
+            session.close();
+            con.close();
+        }
+
+        final String url = "failover:(tcp://localhost:"
+            + (broker.getTransportConnectors().get(1).getConnectUri()).getPort()
+            + "?wireFormat.maxInactivityDuration=0)?"
+            + "jms.watchTopicAdvisories=false&"
+            + "jms.alwaysSyncSend=true&jms.dispatchAsync=true&"
+            + "jms.sendAcksAsync=true&"
+            + "initialReconnectDelay=100&maxReconnectDelay=30000&"
+            + "useExponentialBackOff=true";
+        final ActiveMQConnectionFactory clientFactory = new ActiveMQConnectionFactory(url);
+
+        class CheckOrderClient implements Runnable {
+            final int id;
+            int runCount = 0;
+
+            public CheckOrderClient(int id) {
+                this.id = id;
+            }
+
+            @Override
+            public void run() {
+                try {
+                    synchronized (this) {
+                        Connection con = clientFactory.createConnection();
+                        con.setClientID("cli" + id);
+                        con.start();
+                        Session session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+                        MessageConsumer consumer = session.createDurableSubscriber(topic,
"SubsId", null, true);
+                        int nextId = 0;
+
+                        ++runCount;
+                        int i=0;
+                        for (; i < messageCount/2; i++) {
+                            Message message = consumer.receiveNoWait();
+                            if (message == null) {
+                                break;
+                            }
+                            long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId();
+                            assertEquals(id + " expected order: runCount: " + runCount  +
" id: " + message.getJMSMessageID(), ++nextId, producerSequenceId);
+                        }
+                        LOG.info(con.getClientID() + " peeked " + i);
+                        session.close();
+                        con.close();
+                    }
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                    exceptions.add(e);
+                }
+            }
+        }
+
+        Runnable producer = new Runnable() {
+            final String payLoad = new String(new byte[600]);
+
+            @Override
+            public void run() {
+                try {
+                    Connection con = createConnection();
+                    final Session sendSession = con.createSession(true, Session.SESSION_TRANSACTED);
+                    MessageProducer producer = sendSession.createProducer(topic);
+                    for (int i = 0; i < messageCount; i++) {
+                        producer.send(sendSession.createTextMessage(payLoad));
+                    }
+                    LOG.info("About to commit: " + messageCount);
+                    sendSession.commit();
+                    LOG.info("committed: " + messageCount);
+                    con.close();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    exceptions.add(e);
+                }
+            }
+        };
+
+        ExecutorService executorService = Executors.newCachedThreadPool();
+
+        // concurrent commit and activate
+        for (int i = 0; i < numConsumers; i++) {
+            final CheckOrderClient client = new CheckOrderClient(i);
+            for (int j=0; j<100; j++) {
+                executorService.execute(client);
+            }
+        }
+        executorService.execute(producer);
+
+        executorService.shutdown();
+        executorService.awaitTermination(5, TimeUnit.MINUTES);
+        con.close();
+
+        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+    }
+
     public void testUnmatchedSubUnsubscribeDeletesAll() throws Exception {
         // create offline subs 1
         Connection con = createConnection("offCli1");



Mime
View raw message