activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-4791 - fix and test. removed the delay but left the warn if dispatch ocurrs before interruption processing is complete. Problem was a race between consumer close and sessions copy on write dispatchers
Date Tue, 08 Oct 2013 20:29:29 GMT
Updated Branches:
  refs/heads/trunk 3a0a9aae0 -> dc0291b29


https://issues.apache.org/jira/browse/AMQ-4791 - fix and test. removed the delay but left
the warn if dispatch ocurrs before interruption processing is complete. Problem was a race
between consumer close and sessions copy on write dispatchers list


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/dc0291b2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/dc0291b2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/dc0291b2

Branch: refs/heads/trunk
Commit: dc0291b290bc6d4a9d77eede1dd319ea7db8f903
Parents: 3a0a9aa
Author: gtully <gary.tully@gmail.com>
Authored: Tue Oct 8 21:28:30 2013 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Tue Oct 8 21:29:17 2013 +0100

----------------------------------------------------------------------
 .../org/apache/activemq/ActiveMQConnection.java |  47 +++---
 .../activemq/ActiveMQConnectionConsumer.java    |   6 +-
 .../activemq/ActiveMQMessageConsumer.java       |   6 +-
 .../org/apache/activemq/ActiveMQSession.java    |   6 +-
 .../activemq/state/ConnectionStateTracker.java  |   1 +
 .../FailoverConsumerOutstandingCommitTest.java  |   2 +
 .../failover/FailoverTransactionTest.java       | 149 ++++++++++++++++++-
 7 files changed, 180 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/dc0291b2/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index a622944..5057323 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -198,7 +198,7 @@ public class ActiveMQConnection implements Connection, TopicConnection,
QueueCon
     private DestinationSource destinationSource;
     private final Object ensureConnectionInfoSentMutex = new Object();
     private boolean useDedicatedTaskRunner;
-    protected volatile CountDownLatch transportInterruptionProcessingComplete;
+    protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0);
     private long consumerFailoverRedeliveryWaitPeriod;
     private Scheduler scheduler;
     private boolean messagePrioritySupported = true;
@@ -2023,19 +2023,21 @@ public class ActiveMQConnection implements Connection, TopicConnection,
QueueCon
 
     @Override
     public void transportInterupted() {
-        this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size()
- (advisoryConsumer != null ? 1:0));
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
-        }
-        signalInterruptionProcessingNeeded();
-
+        transportInterruptionProcessingComplete.set(1);
         for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();)
{
             ActiveMQSession s = i.next();
-            s.clearMessagesInProgress();
+            s.clearMessagesInProgress(transportInterruptionProcessingComplete);
         }
 
         for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
-            connectionConsumer.clearMessagesInProgress();
+            connectionConsumer.clearMessagesInProgress(transportInterruptionProcessingComplete);
+        }
+
+        if (transportInterruptionProcessingComplete.decrementAndGet() > 0) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("transport interrupted - processing required, dispatchers: " +
transportInterruptionProcessingComplete.get());
+            }
+            signalInterruptionProcessingNeeded();
         }
 
         for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();)
{
@@ -2462,33 +2464,23 @@ public class ActiveMQConnection implements Connection, TopicConnection,
QueueCon
     }
 
     protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException
{
-        CountDownLatch cdl = this.transportInterruptionProcessingComplete;
-        if (cdl != null) {
-            if (!closed.get() && !transportFailed.get() && cdl.getCount()>0)
{
-                LOG.warn("dispatch paused, waiting for outstanding dispatch interruption
processing (" + cdl.getCount() + ") to complete..");
-                cdl.await(10, TimeUnit.SECONDS);
-            }
+        if (!closed.get() && !transportFailed.get() && transportInterruptionProcessingComplete.get()>0)
{
+            LOG.warn("dispatch with outstanding dispatch interruption processing count "
+ transportInterruptionProcessingComplete.get());
             signalInterruptionProcessingComplete();
         }
     }
 
     protected void transportInterruptionProcessingComplete() {
-        CountDownLatch cdl = this.transportInterruptionProcessingComplete;
-        if (cdl != null) {
-            cdl.countDown();
-            try {
-                signalInterruptionProcessingComplete();
-            } catch (InterruptedException ignored) {}
+        if (transportInterruptionProcessingComplete.decrementAndGet() == 0) {
+            signalInterruptionProcessingComplete();
         }
     }
 
-    private void signalInterruptionProcessingComplete() throws InterruptedException {
-        CountDownLatch cdl = this.transportInterruptionProcessingComplete;
-        if (cdl.getCount()==0) {
+    private void signalInterruptionProcessingComplete() {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
+                LOG.debug("transportInterruptionProcessingComplete: " + transportInterruptionProcessingComplete.get()
+                        + " for:" + this.getConnectionInfo().getConnectionId());
             }
-            this.transportInterruptionProcessingComplete = null;
 
             FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
             if (failoverTransport != null) {
@@ -2498,8 +2490,7 @@ public class ActiveMQConnection implements Connection, TopicConnection,
QueueCon
                             + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
                 }
             }
-
-        }
+            transportInterruptionProcessingComplete.set(0);
     }
 
     private void signalInterruptionProcessingNeeded() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/dc0291b2/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java
index 7dc96a2..ba4b9dd 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java
@@ -20,6 +20,8 @@ package org.apache.activemq;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.ConnectionConsumer;
 import javax.jms.IllegalStateException;
@@ -155,11 +157,9 @@ public class ActiveMQConnectionConsumer implements ConnectionConsumer,
ActiveMQD
         return "ActiveMQConnectionConsumer { value=" + consumerInfo.getConsumerId() + " }";
     }
 
-    public void clearMessagesInProgress() {
+    public void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete)
{
         // future: may want to deal with rollback of in progress messages to track re deliveries
         // before indicating that all is complete.        
-        // Till there is a need, lets immediately allow dispatch
-        this.connection.transportInterruptionProcessingComplete();
     }
 
     public ConsumerInfo getConsumerInfo() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/dc0291b2/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index baf5233..39f55bf 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -687,7 +687,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
 
     void doClose() throws JMSException {
         // Store interrupted state and clear so that Transport operations don't
-        // throw InterruptedException and we ensure that resources are clened up.
+        // throw InterruptedException and we ensure that resources are cleaned up.
         boolean interrupted = Thread.interrupted();
         dispose();
         RemoveInfo removeCommand = info.createRemoveCommand();
@@ -1584,4 +1584,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
             }
         }
     }
+
+    public boolean hasMessageListener() {
+        return messageListener.get() != null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/dc0291b2/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 3348526..0a96134 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
@@ -647,7 +648,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
         }
     }
 
-    void clearMessagesInProgress() {
+    void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) {
         executor.clearMessagesInProgress();
         // we are called from inside the transport reconnection logic
         // which involves us clearing all the connections' consumers
@@ -659,6 +660,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
         //
         for (final ActiveMQMessageConsumer consumer : consumers) {
             consumer.inProgressClearRequired();
+            transportInterruptionProcessingComplete.incrementAndGet();
             try {
                 connection.getScheduler().executeAfterDelay(new Runnable() {
                     public void run() {
@@ -2012,7 +2014,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
         }
         for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();)
{
             ActiveMQMessageConsumer consumer = i.next();
-            if (consumer.getMessageListener() != null) {
+            if (consumer.hasMessageListener()) {
                 throw new IllegalStateException("Cannot synchronously receive a message when
a MessageListener is set");
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/dc0291b2/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
index 6a1ca36..8fbf81b 100755
--- a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
+++ b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
@@ -406,6 +406,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
                         if (ss != null) {
                             ss.removeConsumer(id);
                         }
+                        cs.getRecoveringPullConsumers().remove(id);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/dc0291b2/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
index 9115c15..9c08c81 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
@@ -167,6 +167,7 @@ public class FailoverConsumerOutstandingCommitTest {
                 LOG.info("producer started");
                 try {
                     produceMessage(producerSession, destination, prefetch * 2);
+                } catch (javax.jms.IllegalStateException SessionClosedExpectedOnShutdown)
{
                 } catch (JMSException e) {
                     e.printStackTrace();
                     fail("unexpceted ex on producer: " + e);
@@ -273,6 +274,7 @@ public class FailoverConsumerOutstandingCommitTest {
                 LOG.info("producer started");
                 try {
                     produceMessage(producerSession, destination, prefetch * 2);
+                } catch (javax.jms.IllegalStateException SessionClosedExpectedOnShutdown)
{
                 } catch (JMSException e) {
                     e.printStackTrace();
                     fail("unexpceted ex on producer: " + e);

http://git-wip-us.apache.org/repos/asf/activemq/blob/dc0291b2/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index ed45b95..54a8a01 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -17,7 +17,9 @@
 package org.apache.activemq.transport.failover;
 
 import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
 import org.apache.activemq.AutoFailTestSupport;
 import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.BrokerPlugin;
@@ -29,10 +31,12 @@ import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.util.DestinationPathSeparatorBroker;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.SocketProxy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,9 +52,15 @@ import javax.jms.ServerSessionPool;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.TransactionRolledBackException;
+import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.NoSuchElementException;
+import java.util.Stack;
 import java.util.Vector;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -874,6 +884,139 @@ public class FailoverTransactionTest extends TestSupport {
         connection.close();
     }
 
+    public void testPoolingNConsumesAfterReconnect() throws Exception {
+        broker = createBroker(true);
+        setDefaultPersistenceAdapter(broker);
+
+        broker.setPlugins(new BrokerPlugin[]{
+                new BrokerPluginSupport() {
+                    int count = 0;
+
+                    @Override
+                    public void removeConsumer(ConnectionContext context, final ConsumerInfo
info) throws Exception {
+                        if (count++ == 1) {
+                            Executors.newSingleThreadExecutor().execute(new Runnable() {
+                                public void run() {
+                                    LOG.info("Stopping broker on removeConsumer: " + info);
+                                    try {
+                                        broker.stop();
+                                    } catch (Exception e) {
+                                        e.printStackTrace();
+                                    }
+                                }
+                            });
+                        }
+                    }
+                }
+        });
+        broker.start();
+
+        Vector<Connection> connections = new Vector<Connection>();
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url +
")");
+        configureConnectionFactory(cf);
+        Connection connection = cf.createConnection();
+        connection.start();
+        Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
+
+        produceMessage(producerSession, destination);
+        connection.close();
+
+        connection = cf.createConnection();
+        connection.start();
+        connections.add(connection);
+        final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        final int sessionCount = 10;
+        final Stack<Session> sessions = new Stack<Session>();
+        for (int i = 0; i < sessionCount; i++) {
+            sessions.push(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
+        }
+
+        final int consumerCount = 1000;
+        final Deque<MessageConsumer> consumers = new ArrayDeque<MessageConsumer>();
+        for (int i = 0; i < consumerCount; i++) {
+            consumers.push(consumerSession.createConsumer(destination));
+        }
+        final ExecutorService executorService = Executors.newCachedThreadPool();
+
+        final FailoverTransport failoverTransport = ((ActiveMQConnection) connection).getTransport().narrow(FailoverTransport.class);
+        final TransportListener delegate = failoverTransport.getTransportListener();
+        failoverTransport.setTransportListener(new TransportListener() {
+            @Override
+            public void onCommand(Object command) {
+                delegate.onCommand(command);
+            }
+
+            @Override
+            public void onException(IOException error) {
+                delegate.onException(error);
+            }
+
+            @Override
+            public void transportInterupted() {
+
+                LOG.error("Transport interrupted: " + failoverTransport, new RuntimeException("HERE"));
+                for (int i = 0; i < consumerCount && !consumers.isEmpty(); i++)
{
+
+                    executorService.execute(new Runnable() {
+                        public void run() {
+                            MessageConsumer localConsumer = null;
+                            try {
+                                synchronized (delegate) {
+                                    localConsumer = consumers.pop();
+                                }
+                                localConsumer.receive(1);
+
+                                LOG.info("calling close() " + ((ActiveMQMessageConsumer)
localConsumer).getConsumerId());
+                                localConsumer.close();
+                            } catch (NoSuchElementException nse) {
+                            } catch (Exception ignored) {
+                                LOG.error("Ex on: " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId(),
ignored);
+                            }
+                        }
+                    });
+                }
+
+                delegate.transportInterupted();
+            }
+
+            @Override
+            public void transportResumed() {
+                delegate.transportResumed();
+            }
+        });
+
+
+        MessageConsumer consumer = null;
+        synchronized (delegate) {
+            consumer = consumers.pop();
+        }
+        LOG.info("calling close to trigger broker stop " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
+        consumer.close();
+
+        // will be stopped by the plugin
+        broker.waitUntilStopped();
+        broker = createBroker(false, url);
+        setDefaultPersistenceAdapter(broker);
+        broker.start();
+
+        consumer = consumerSession.createConsumer(destination);
+        LOG.info("finally consuming message: " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
+
+        Message msg = null;
+        for (int i = 0; i < 4 && msg == null; i++) {
+            msg = consumer.receive(1000);
+        }
+        LOG.info("post: from consumer1 received: " + msg);
+        assertNotNull("got message after failover", msg);
+        msg.acknowledge();
+
+        for (Connection c : connections) {
+            c.close();
+        }
+    }
+
     public void testAutoRollbackWithMissingRedeliveries() throws Exception {
         broker = createBroker(true);
         broker.start();
@@ -991,8 +1134,8 @@ public class FailoverTransactionTest extends TestSupport {
 
         final Vector<Exception> exceptions = new Vector<Exception>();
 
-        // commit may fail if other consumer gets the message on restart, it will be seen
a a duplicate on teh connection
-        // but with no transaciton and it pending on another consumer it will be posion
+        // commit may fail if other consumer gets the message on restart, it will be seen
as a duplicate on the connection
+        // but with no transaction and it pending on another consumer it will be poison
         Executors.newSingleThreadExecutor().execute(new Runnable() {
             public void run() {
                 LOG.info("doing async commit...");
@@ -1012,7 +1155,7 @@ public class FailoverTransactionTest extends TestSupport {
 
         // either message consumed or sent to dlq via poison on redelivery to wrong consumer
         // message should not be available again in any event
-        assertNull("consumer should not get rolledback on non redelivered message or duplicate",
consumer.receive(5000));
+        assertNull("consumer should not get rolled back on non redelivered message or duplicate",
consumer.receive(5000));
 
         // consumer replay is hashmap order dependent on a failover connection state recover
so need to deal with both cases
         if (exceptions.isEmpty()) {


Mime
View raw message