activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r753214 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ft/ main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/advisory/ test/java/org/apache/activemq/bugs/
Date Fri, 13 Mar 2009 11:59:08 GMT
Author: gtully
Date: Fri Mar 13 11:59:08 2009
New Revision: 753214

URL: http://svn.apache.org/viewvc?rev=753214&view=rev
Log:
resolve AMQ-2102|https://issues.apache.org/activemq/browse/AMQ-2102 - refactor message dispatch
on slave to take account of subscription choice on the master, this ensures slave is in sync
w.r.t outstanding acks. processDispatchNotification imoplemented by Queue type destinations
which delegates to subscription after doing a dispatch, test demonstrates slve out of sync
errors

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
Fri Mar 13 11:59:08 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker.ft;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.broker.Connection;
@@ -28,6 +30,7 @@
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionControl;
 import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.ExceptionResponse;
@@ -58,8 +61,9 @@
     private static final Log LOG = LogFactory.getLog(MasterBroker.class);
     private Transport slave;
     private AtomicBoolean started = new AtomicBoolean(false);
-    private final Object addConsumerLock = new Object();
 
+    private Map<ConsumerId, ConsumerId> consumers = new ConcurrentHashMap<ConsumerId,
ConsumerId>();
+    
     /**
      * Constructor
      * 
@@ -197,14 +201,19 @@
      * @throws Exception
      */
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws
Exception {
-        // as master and slave do independent dispatch, the consumer add order between master
and slave
-        // needs to be maintained
-        synchronized (addConsumerLock) {
-    	    sendSyncToSlave(info);
-    	    return super.addConsumer(context, info);
-        }
+        sendSyncToSlave(info);
+        consumers.put(info.getConsumerId(), info.getConsumerId());
+        return super.addConsumer(context, info);
     }
 
+    @Override
+    public void removeConsumer(ConnectionContext context, ConsumerInfo info)
+            throws Exception {
+        super.removeConsumer(context, info);
+        consumers.remove(info.getConsumerId());
+        sendSyncToSlave(new RemoveInfo(info.getConsumerId()));
+   }
+
     /**
      * remove a subscription
      * 
@@ -317,7 +326,9 @@
         if (messageDispatch.getMessage() != null) {
             Message msg = messageDispatch.getMessage();
             mdn.setMessageId(msg.getMessageId());
-            sendSyncToSlave(mdn);
+            if (consumers.containsKey(messageDispatch.getConsumerId())) {
+                sendSyncToSlave(mdn);
+            }
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Fri Mar 13 11:59:08 2009
@@ -418,6 +418,34 @@
         Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId());
         if (sub != null) {
             sub.processMessageDispatchNotification(messageDispatchNotification);
+        } else {
+            throw new JMSException("Slave broker out of sync with master - Subscription:
"
+                    + messageDispatchNotification.getConsumerId()
+                    + " on " + messageDispatchNotification.getDestination()
+                    + " does not exist for dispatch of message: "
+                    + messageDispatchNotification.getMessageId());
+        }
+    }
+    
+    /*
+     * For a Queue/TempQueue, dispatch order is imperative to match acks, so the dispatch
is deferred till 
+     * the notification to ensure that the subscription chosen by the master is used. AMQ-2102
+     */ 
+    protected void processDispatchNotificationViaDestination(MessageDispatchNotification
messageDispatchNotification) throws Exception {
+        Destination dest = null;
+        synchronized (destinationsMutex) {
+            dest = destinations.get(messageDispatchNotification.getDestination());
+        }
+        if (dest != null) {
+            dest.processDispatchNotification(messageDispatchNotification);
+        } else {
+            throw new JMSException(
+                    "Slave broker out of sync with master - Destination: " 
+                            + messageDispatchNotification.getDestination()
+                            + " does not exist for consumer "
+                            + messageDispatchNotification.getConsumerId()
+                            + " with message: "
+                            + messageDispatchNotification.getMessageId());
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Fri Mar 13 11:59:08 2009
@@ -18,6 +18,8 @@
 
 import java.io.IOException;
 
+import javax.jms.JMSException;
+
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;
@@ -27,6 +29,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.store.MessageStore;
@@ -485,4 +488,9 @@
             }
         }
     }
+    
+    public void processDispatchNotification(
+            MessageDispatchNotification messageDispatchNotification) throws Exception {
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Fri Mar 13 11:59:08 2009
@@ -28,6 +28,7 @@
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.thread.Task;
@@ -175,4 +176,12 @@
      void isFull(ConnectionContext context,Usage usage);
 
     List<Subscription> getConsumers();
+
+    /**
+     * called on Queues in slave mode to allow dispatch to follow subscription choice of
master
+     * @param messageDispatchNotification
+     * @throws Exception
+     */
+    void processDispatchNotification(
+            MessageDispatchNotification messageDispatchNotification) throws Exception;
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Fri Mar 13 11:59:08 2009
@@ -27,6 +27,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.usage.MemoryUsage;
@@ -259,4 +260,9 @@
     public void setMaxBrowsePageSize(int maxPageSize) {
         next.setMaxBrowsePageSize(maxPageSize);
     }
+
+    public void processDispatchNotification(
+            MessageDispatchNotification messageDispatchNotification) throws Exception {
+        next.processDispatchNotification(messageDispatchNotification);   
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Mar 13 11:59:08 2009
@@ -38,7 +38,6 @@
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 
-import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -48,15 +47,14 @@
 import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
 import org.apache.activemq.broker.region.group.MessageGroupMap;
 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
-import org.apache.activemq.broker.region.group.MessageGroupSet;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerInfo;
@@ -65,7 +63,6 @@
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.selector.SelectorParser;
-import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.thread.DeterministicTaskRunner;
@@ -1001,7 +998,7 @@
      * @see org.apache.activemq.thread.Task#iterate()
      */
     public boolean iterate() {
-        boolean pageInMoreMessages = false;
+        boolean pageInMoreMessages = false;   
         synchronized(iteratingMutex) {
             BrowserDispatch rd;
 	        while ((rd = getNextBrowserDispatch()) != null) {
@@ -1244,13 +1241,13 @@
                 // Only page in the minimum number of messages which can be dispatched immediately.
                 toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
             }
-            if ((force || !consumers.isEmpty()) && toPageIn > 0) {
-                messages.setMaxBatchSize(toPageIn);
+            
+            if ((force || !consumers.isEmpty()) && toPageIn > 0) { 
                 int count = 0;
                 result = new ArrayList<QueueMessageReference>(toPageIn);
                 synchronized (messages) {
                     try {
-                      
+                        messages.setMaxBatchSize(toPageIn);
                         messages.reset();
                         while (messages.hasNext() && count < toPageIn) {
                             MessageReference node = messages.next();
@@ -1326,7 +1323,8 @@
         List<Subscription> consumers;
         
         synchronized (this.consumers) {
-            if (this.consumers.isEmpty()) {
+            if (this.consumers.isEmpty() || isSlave()) {
+                // slave dispatch happens in processDispatchNotification
                 return list;
             }
             consumers = new ArrayList<Subscription>(this.consumers);
@@ -1422,4 +1420,104 @@
         return total;
     }
 
+    /* 
+     * In slave mode, dispatch is ignored till we get this notification as the dispatch
+     * process is non deterministic between master and slave.
+     * On a notification, the actual dispatch to the subscription (as chosen by the master)

+     * is completed. 
+     * (non-Javadoc)
+     * @see org.apache.activemq.broker.region.BaseDestination#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification)
+     */
+    public void processDispatchNotification(
+            MessageDispatchNotification messageDispatchNotification) throws Exception {
+        // do dispatch
+        Subscription sub = getMatchingSubscription(messageDispatchNotification);
+        if (sub != null) {
+            MessageReference message = getMatchingMessage(messageDispatchNotification);
+            sub.add(message);   
+            sub.processMessageDispatchNotification(messageDispatchNotification);
+        }
+    }
+
+    private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification)
throws Exception {
+        QueueMessageReference message = null;
+        MessageId messageId = messageDispatchNotification.getMessageId();
+        
+        dispatchLock.lock();
+        try {
+            synchronized (pagedInPendingDispatch) {
+               for(QueueMessageReference ref : pagedInPendingDispatch) {
+                   if (messageId.equals(ref.getMessageId())) {
+                       message = ref;
+                       pagedInPendingDispatch.remove(ref);
+                       break;
+                   }
+               }
+            }
+    
+            if (message == null) {
+                synchronized (pagedInMessages) {
+                    message = pagedInMessages.get(messageId);
+                }
+            }
+            
+            if (message == null) {            
+                synchronized (messages) {
+                    try {
+                        messages.setMaxBatchSize(getMaxPageSize());
+                        messages.reset();
+                        while (messages.hasNext()) {
+                            MessageReference node = messages.next();
+                            node.incrementReferenceCount();
+                            messages.remove();
+                            if (messageId.equals(node.getMessageId())) {
+                                message = this.createMessageReference(node.getMessage());
+                                break;
+                            }
+                        }
+                    } finally {
+                        messages.release();
+                    }
+                }
+            }
+            
+            if (message == null) {
+                Message msg = loadMessage(messageId);
+                if (msg != null) {
+                    message = this.createMessageReference(msg);
+                }
+            }          
+            
+        } finally {
+            dispatchLock.unlock();        
+        }
+        if (message == null) {
+            throw new JMSException(
+                    "Slave broker out of sync with master - Message: "
+                    + messageDispatchNotification.getMessageId()
+                    + " on " + messageDispatchNotification.getDestination()
+                    + " does not exist among pending(" + pagedInPendingDispatch.size() +
") for subscription: "
+                    + messageDispatchNotification.getConsumerId());
+        }
+        return message;
+    }
+
+    /**
+     * Find a consumer that matches the id in the message dispatch notification
+     * @param messageDispatchNotification
+     * @return sub or null if the subscription has been removed before dispatch
+     * @throws JMSException
+     */
+    private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification)
throws JMSException {
+        Subscription sub = null;
+        synchronized (consumers) {
+            for (Subscription s : consumers) {
+                if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId()))
{
+                    sub = s;
+                    break;
+                }
+            }
+        }
+        return sub;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
Fri Mar 13 11:59:08 2009
@@ -24,6 +24,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.usage.SystemUsage;
 
@@ -64,4 +65,15 @@
         }
         return inactiveDestinations;
     }
+    
+    /*
+     * For a Queue, dispatch order is imperative to match acks, so the dispatch is deferred
till 
+     * the notification to ensure that the subscription chosen by the master is used.
+     * 
+     * (non-Javadoc)
+     * @see org.apache.activemq.broker.region.AbstractRegion#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification)
+     */
+    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification)
throws Exception {
+        processDispatchNotificationViaDestination(messageDispatchNotification);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
Fri Mar 13 11:59:08 2009
@@ -22,6 +22,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
@@ -72,4 +73,16 @@
 
         super.removeDestination(context, destination, timeout);
     }
+    
+    /*
+     * For a Queue, dispatch order is imperative to match acks, so the dispatch is deferred
till 
+     * the notification to ensure that the subscription chosen by the master is used.
+     * 
+     * (non-Javadoc)
+     * @see org.apache.activemq.broker.region.AbstractRegion#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification)
+     */
+    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification)
throws Exception {
+        processDispatchNotificationViaDestination(messageDispatchNotification);
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
Fri Mar 13 11:59:08 2009
@@ -108,11 +108,8 @@
         RegionBroker masterRb = (RegionBroker) broker.getBroker().getAdaptor(
                 RegionBroker.class);
 
-        // REVISIT the following two are not dependable at the moment, off by a small number
-        // for some reason? The work for a COUNT < ~500
-        //
-        //assertEquals("inflight match", rb.getDestinationStatistics().getInflight().getCount(),
masterRb.getDestinationStatistics().getInflight().getCount());
-        //assertEquals("enqueues match", rb.getDestinationStatistics().getEnqueues().getCount(),
masterRb.getDestinationStatistics().getEnqueues().getCount());
+        assertEquals("inflight match", rb.getDestinationStatistics().getInflight().getCount(),
masterRb.getDestinationStatistics().getInflight().getCount());
+        assertEquals("enqueues match", rb.getDestinationStatistics().getEnqueues().getCount(),
masterRb.getDestinationStatistics().getEnqueues().getCount());
         
         assertEquals("dequeues match",
                 rb.getDestinationStatistics().getDequeues().getCount(),

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java Fri
Mar 13 11:59:08 2009
@@ -47,14 +47,15 @@
 import org.apache.commons.logging.LogFactory;
 
 public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
+       
+    final static int MESSAGE_COUNT = 12120;
+    final static int NUM_CONSUMERS = 10;
+    final static int CONSUME_ALL = -1;
     
     
-    final static int MESSAGE_COUNT = 5120;
-    final static int NUM_CONSUMERS = 20;
-    
     private static final Log LOG = LogFactory.getLog(AMQ2102Test.class);
     
-    private final Map<Thread, Throwable> exceptions = new ConcurrentHashMap<Thread,
Throwable>();
+    private final static Map<Thread, Throwable> exceptions = new ConcurrentHashMap<Thread,
Throwable>();
     
     private class Consumer implements Runnable, ExceptionListener {
         private ActiveMQConnectionFactory connectionFactory;
@@ -63,12 +64,14 @@
         private boolean running;
         private org.omg.CORBA.IntHolder startup;
         private Thread thread;
+        private int numToProcessPerIteration;
 
-        Consumer(ActiveMQConnectionFactory connectionFactory, String queueName, org.omg.CORBA.IntHolder
startup, int id) {
+        Consumer(ActiveMQConnectionFactory connectionFactory, String queueName, org.omg.CORBA.IntHolder
startup, int id, int numToProcess) {
             this.connectionFactory = connectionFactory;
             this.queueName = queueName;
             this.startup = startup;
             name = "Consumer-" + queueName + "-" + id;
+            numToProcessPerIteration = numToProcess;
             thread = new Thread(this, name);
         }
 
@@ -93,6 +96,7 @@
         }
 
         public void onException(JMSException e) {
+            exceptions.put(Thread.currentThread(), e);
             error("JMS exception: ", e);
         }
 
@@ -146,7 +150,13 @@
             Session session = null;
             try {
                 session = connection.createSession(true, Session.SESSION_TRANSACTED);
-                processMessages(session);
+                if (numToProcessPerIteration > 0) {
+                    while(isRunning()) {
+                        processMessages(session);
+                    }
+                } else {
+                    processMessages(session);
+                }
             } finally {
                 if (session != null) {
                     session.close();
@@ -189,7 +199,8 @@
                 }
                 startup = null;
             }
-            while (isRunning()) {
+            int numToProcess = numToProcessPerIteration;
+            do {
                 Message message = consumer.receive(5000);
 
                 if (message != null) {
@@ -201,7 +212,7 @@
                         session.rollback();
                     }
                 }
-            }
+            } while ((numToProcess == CONSUME_ALL || --numToProcess > 0) && isRunning());
         }
 
         public void run() {
@@ -224,7 +235,7 @@
         }
     }
     
-    private class Producer {
+    private class Producer implements ExceptionListener {
         private ActiveMQConnectionFactory connectionFactory;
         private String queueName;
         
@@ -246,6 +257,7 @@
 
             try {
                 connection = (ActiveMQConnection) connectionFactory.createConnection();
+                connection.setExceptionListener(this);
                 connection.start();
 
                 sendMessages(connection);
@@ -302,6 +314,7 @@
                 sendMessages(session, replyQueue, consumer);
             } finally {
                 consumer.close();
+                session.commit();
             }
         }
 
@@ -326,9 +339,8 @@
             }
         }
 
-        private void sendMessages(Session session, Destination replyQueue, MessageConsumer
consumer) throws JMSException {
+        private void sendMessages(final Session session, Destination replyQueue, MessageConsumer
consumer) throws JMSException {
             final org.omg.CORBA.IntHolder messageCount = new org.omg.CORBA.IntHolder(MESSAGE_COUNT);
-
             consumer.setMessageListener(new MessageListener() {
                 public void onMessage(Message reply) {
                     if (reply instanceof TextMessage) {
@@ -340,6 +352,15 @@
                                 error("Problem processing reply", e);
                             }
                             messageCount.value--;
+                            if (messageCount.value % 200 == 0) {
+                                // ack a bunch of replys
+                                info("acking via session commit: messageCount=" + messageCount.value);
+                                try {
+                                    session.commit();
+                                } catch (JMSException e) {
+                                    error("Failed to commit with count: " + messageCount.value,
e);
+                                }
+                            }
                             messageCount.notify();
                         }
                     } else {
@@ -354,11 +375,7 @@
             synchronized (messageCount) {
                 while (messageCount.value > 0) {
                     
-                    if (messageCount.value % 100 == 0) {
-                        // ack a bunch of replys
-                        debug("acking via session commit: messageCount=" + messageCount.value);
-                        session.commit();
-                    }
+                    
                     try {
                         messageCount.wait();
                     } catch (InterruptedException e) {
@@ -370,12 +387,21 @@
             session.commit();
             debug("All replies received...");
         }
+
+        public void onException(JMSException exception) {
+           LOG.error(exception);
+           exceptions.put(Thread.currentThread(), exception);
+        }
     }
 
     private static void debug(String message) {
         LOG.debug(message);
     }
 
+    private static void info(String message) {
+        LOG.info(message);
+    }
+    
     private static void error(String message) {
         LOG.error(message);
     }
@@ -384,15 +410,17 @@
         t.printStackTrace();
         String msg = message + ": " + (t.getMessage() != null ? t.getMessage() : t.toString());
         LOG.error(msg, t);
+        exceptions.put(Thread.currentThread(), t);
         fail(msg);
     }
 
-    private ArrayList<Consumer> createConsumers(ActiveMQConnectionFactory connectionFactory,
String queueName, int max) {
+    private ArrayList<Consumer> createConsumers(ActiveMQConnectionFactory connectionFactory,
String queueName, 
+            int max, int numToProcessPerConsumer) {
         ArrayList<Consumer> consumers = new ArrayList<Consumer>(max);
         org.omg.CORBA.IntHolder startup = new org.omg.CORBA.IntHolder(max);
 
         for (int id = 0; id < max; id++) {
-            consumers.add(new Consumer(connectionFactory, queueName, startup, id));
+            consumers.add(new Consumer(connectionFactory, queueName, startup, id, numToProcessPerConsumer));
         }
         for (Consumer consumer : consumers) {
             consumer.start();
@@ -445,6 +473,7 @@
     public void tearDown() throws Exception {
         master.stop();
         slave.stop();
+        exceptions.clear();
     }
     
     public void testMasterSlaveBug() throws Exception {
@@ -453,7 +482,7 @@
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:("
+ 
                 masterUrl + ")?randomize=false");
         String queueName = "MasterSlaveBug";
-        ArrayList<Consumer> consumers = createConsumers(connectionFactory, queueName,
NUM_CONSUMERS);
+        ArrayList<Consumer> consumers = createConsumers(connectionFactory, queueName,
NUM_CONSUMERS, CONSUME_ALL);
         
         Producer producer = new Producer(connectionFactory, queueName);
         producer.execute(new String[]{});
@@ -468,10 +497,31 @@
         assertTrue(exceptions.isEmpty());
     }
 
+    
+    public void testMasterSlaveBugWithStopStartConsumers() throws Exception {
+
+        Thread.setDefaultUncaughtExceptionHandler(this);
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+                "failover:(" + masterUrl + ")?randomize=false");
+        String queueName = "MasterSlaveBug";
+        ArrayList<Consumer> consumers = createConsumers(connectionFactory,
+                queueName, NUM_CONSUMERS, 10);
+
+        Producer producer = new Producer(connectionFactory, queueName);
+        producer.execute(new String[] {});
+
+        for (Consumer consumer : consumers) {
+            consumer.setRunning(false);
+        }
+
+        for (Consumer consumer : consumers) {
+            consumer.join();
+        }
+        assertTrue(exceptions.isEmpty());
+    }
+
     public void uncaughtException(Thread t, Throwable e) {
         error("" + t + e);
         exceptions.put(t,e);
-        
-        
     }
 }



Mime
View raw message