Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 13533 invoked from network); 13 Mar 2009 11:59:40 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 13 Mar 2009 11:59:40 -0000 Received: (qmail 56083 invoked by uid 500); 13 Mar 2009 11:59:40 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 56063 invoked by uid 500); 13 Mar 2009 11:59:40 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 56054 invoked by uid 99); 13 Mar 2009 11:59:40 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Mar 2009 04:59:40 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Mar 2009 11:59:30 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 1EAA8238889F; Fri, 13 Mar 2009 11:59:09 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r753214 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ft/ main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/advisory/ test/java/org/apache/activemq/bugs/ Date: Fri, 13 Mar 2009 11:59:08 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090313115909.1EAA8238889F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Fri Mar 13 11:59:08 2009 New Revision: 753214 URL: http://svn.apache.org/viewvc?rev=753214&view=rev Log: resolve AMQ-2102|https://issues.apache.org/activemq/browse/AMQ-2102 - refactor message dispatch on slave to take account of subscription choice on the master, this ensures slave is in sync w.r.t outstanding acks. processDispatchNotification imoplemented by Queue type destinations which delegates to subscription after doing a dispatch, test demonstrates slve out of sync errors Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java?rev=753214&r1=753213&r2=753214&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java Fri Mar 13 11:59:08 2009 @@ -16,6 +16,8 @@ */ package org.apache.activemq.broker.ft; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.broker.Connection; @@ -28,6 +30,7 @@ import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.ExceptionResponse; @@ -58,8 +61,9 @@ private static final Log LOG = LogFactory.getLog(MasterBroker.class); private Transport slave; private AtomicBoolean started = new AtomicBoolean(false); - private final Object addConsumerLock = new Object(); + private Map consumers = new ConcurrentHashMap(); + /** * Constructor * @@ -197,14 +201,19 @@ * @throws Exception */ public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - // as master and slave do independent dispatch, the consumer add order between master and slave - // needs to be maintained - synchronized (addConsumerLock) { - sendSyncToSlave(info); - return super.addConsumer(context, info); - } + sendSyncToSlave(info); + consumers.put(info.getConsumerId(), info.getConsumerId()); + return super.addConsumer(context, info); } + @Override + public void removeConsumer(ConnectionContext context, ConsumerInfo info) + throws Exception { + super.removeConsumer(context, info); + consumers.remove(info.getConsumerId()); + sendSyncToSlave(new RemoveInfo(info.getConsumerId())); + } + /** * remove a subscription * @@ -317,7 +326,9 @@ if (messageDispatch.getMessage() != null) { Message msg = messageDispatch.getMessage(); mdn.setMessageId(msg.getMessageId()); - sendSyncToSlave(mdn); + if (consumers.containsKey(messageDispatch.getConsumerId())) { + sendSyncToSlave(mdn); + } } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=753214&r1=753213&r2=753214&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Fri Mar 13 11:59:08 2009 @@ -418,6 +418,34 @@ Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId()); if (sub != null) { sub.processMessageDispatchNotification(messageDispatchNotification); + } else { + throw new JMSException("Slave broker out of sync with master - Subscription: " + + messageDispatchNotification.getConsumerId() + + " on " + messageDispatchNotification.getDestination() + + " does not exist for dispatch of message: " + + messageDispatchNotification.getMessageId()); + } + } + + /* + * For a Queue/TempQueue, dispatch order is imperative to match acks, so the dispatch is deferred till + * the notification to ensure that the subscription chosen by the master is used. AMQ-2102 + */ + protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification) throws Exception { + Destination dest = null; + synchronized (destinationsMutex) { + dest = destinations.get(messageDispatchNotification.getDestination()); + } + if (dest != null) { + dest.processDispatchNotification(messageDispatchNotification); + } else { + throw new JMSException( + "Slave broker out of sync with master - Destination: " + + messageDispatchNotification.getDestination() + + " does not exist for consumer " + + messageDispatchNotification.getConsumerId() + + " with message: " + + messageDispatchNotification.getMessageId()); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=753214&r1=753213&r2=753214&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Fri Mar 13 11:59:08 2009 @@ -18,6 +18,8 @@ import java.io.IOException; +import javax.jms.JMSException; + import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; @@ -27,6 +29,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.state.ProducerState; import org.apache.activemq.store.MessageStore; @@ -485,4 +488,9 @@ } } } + + public void processDispatchNotification( + MessageDispatchNotification messageDispatchNotification) throws Exception { + } + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=753214&r1=753213&r2=753214&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Fri Mar 13 11:59:08 2009 @@ -28,6 +28,7 @@ import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.store.MessageStore; import org.apache.activemq.thread.Task; @@ -175,4 +176,12 @@ void isFull(ConnectionContext context,Usage usage); List getConsumers(); + + /** + * called on Queues in slave mode to allow dispatch to follow subscription choice of master + * @param messageDispatchNotification + * @throws Exception + */ + void processDispatchNotification( + MessageDispatchNotification messageDispatchNotification) throws Exception; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=753214&r1=753213&r2=753214&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Fri Mar 13 11:59:08 2009 @@ -27,6 +27,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.store.MessageStore; import org.apache.activemq.usage.MemoryUsage; @@ -259,4 +260,9 @@ public void setMaxBrowsePageSize(int maxPageSize) { next.setMaxBrowsePageSize(maxPageSize); } + + public void processDispatchNotification( + MessageDispatchNotification messageDispatchNotification) throws Exception { + next.processDispatchNotification(messageDispatchNotification); + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=753214&r1=753213&r2=753214&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Mar 13 11:59:08 2009 @@ -38,7 +38,6 @@ import javax.jms.InvalidSelectorException; import javax.jms.JMSException; -import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; @@ -48,15 +47,14 @@ import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; import org.apache.activemq.broker.region.group.MessageGroupMap; import org.apache.activemq.broker.region.group.MessageGroupMapFactory; -import org.apache.activemq.broker.region.group.MessageGroupSet; import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerInfo; @@ -65,7 +63,6 @@ import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.selector.SelectorParser; -import org.apache.activemq.state.ProducerState; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; import org.apache.activemq.thread.DeterministicTaskRunner; @@ -1001,7 +998,7 @@ * @see org.apache.activemq.thread.Task#iterate() */ public boolean iterate() { - boolean pageInMoreMessages = false; + boolean pageInMoreMessages = false; synchronized(iteratingMutex) { BrowserDispatch rd; while ((rd = getNextBrowserDispatch()) != null) { @@ -1244,13 +1241,13 @@ // Only page in the minimum number of messages which can be dispatched immediately. toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn); } - if ((force || !consumers.isEmpty()) && toPageIn > 0) { - messages.setMaxBatchSize(toPageIn); + + if ((force || !consumers.isEmpty()) && toPageIn > 0) { int count = 0; result = new ArrayList(toPageIn); synchronized (messages) { try { - + messages.setMaxBatchSize(toPageIn); messages.reset(); while (messages.hasNext() && count < toPageIn) { MessageReference node = messages.next(); @@ -1326,7 +1323,8 @@ List consumers; synchronized (this.consumers) { - if (this.consumers.isEmpty()) { + if (this.consumers.isEmpty() || isSlave()) { + // slave dispatch happens in processDispatchNotification return list; } consumers = new ArrayList(this.consumers); @@ -1422,4 +1420,104 @@ return total; } + /* + * In slave mode, dispatch is ignored till we get this notification as the dispatch + * process is non deterministic between master and slave. + * On a notification, the actual dispatch to the subscription (as chosen by the master) + * is completed. + * (non-Javadoc) + * @see org.apache.activemq.broker.region.BaseDestination#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification) + */ + public void processDispatchNotification( + MessageDispatchNotification messageDispatchNotification) throws Exception { + // do dispatch + Subscription sub = getMatchingSubscription(messageDispatchNotification); + if (sub != null) { + MessageReference message = getMatchingMessage(messageDispatchNotification); + sub.add(message); + sub.processMessageDispatchNotification(messageDispatchNotification); + } + } + + private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification) throws Exception { + QueueMessageReference message = null; + MessageId messageId = messageDispatchNotification.getMessageId(); + + dispatchLock.lock(); + try { + synchronized (pagedInPendingDispatch) { + for(QueueMessageReference ref : pagedInPendingDispatch) { + if (messageId.equals(ref.getMessageId())) { + message = ref; + pagedInPendingDispatch.remove(ref); + break; + } + } + } + + if (message == null) { + synchronized (pagedInMessages) { + message = pagedInMessages.get(messageId); + } + } + + if (message == null) { + synchronized (messages) { + try { + messages.setMaxBatchSize(getMaxPageSize()); + messages.reset(); + while (messages.hasNext()) { + MessageReference node = messages.next(); + node.incrementReferenceCount(); + messages.remove(); + if (messageId.equals(node.getMessageId())) { + message = this.createMessageReference(node.getMessage()); + break; + } + } + } finally { + messages.release(); + } + } + } + + if (message == null) { + Message msg = loadMessage(messageId); + if (msg != null) { + message = this.createMessageReference(msg); + } + } + + } finally { + dispatchLock.unlock(); + } + if (message == null) { + throw new JMSException( + "Slave broker out of sync with master - Message: " + + messageDispatchNotification.getMessageId() + + " on " + messageDispatchNotification.getDestination() + + " does not exist among pending(" + pagedInPendingDispatch.size() + ") for subscription: " + + messageDispatchNotification.getConsumerId()); + } + return message; + } + + /** + * Find a consumer that matches the id in the message dispatch notification + * @param messageDispatchNotification + * @return sub or null if the subscription has been removed before dispatch + * @throws JMSException + */ + private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification) throws JMSException { + Subscription sub = null; + synchronized (consumers) { + for (Subscription s : consumers) { + if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId())) { + sub = s; + break; + } + } + } + return sub; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java?rev=753214&r1=753213&r2=753214&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java Fri Mar 13 11:59:08 2009 @@ -24,6 +24,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.usage.SystemUsage; @@ -64,4 +65,15 @@ } return inactiveDestinations; } + + /* + * For a Queue, dispatch order is imperative to match acks, so the dispatch is deferred till + * the notification to ensure that the subscription chosen by the master is used. + * + * (non-Javadoc) + * @see org.apache.activemq.broker.region.AbstractRegion#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification) + */ + public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { + processDispatchNotificationViaDestination(messageDispatchNotification); + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=753214&r1=753213&r2=753214&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Fri Mar 13 11:59:08 2009 @@ -22,6 +22,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.usage.SystemUsage; import org.apache.commons.logging.Log; @@ -72,4 +73,16 @@ super.removeDestination(context, destination, timeout); } + + /* + * For a Queue, dispatch order is imperative to match acks, so the dispatch is deferred till + * the notification to ensure that the subscription chosen by the master is used. + * + * (non-Javadoc) + * @see org.apache.activemq.broker.region.AbstractRegion#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification) + */ + public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { + processDispatchNotificationViaDestination(messageDispatchNotification); + } + } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java?rev=753214&r1=753213&r2=753214&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java Fri Mar 13 11:59:08 2009 @@ -108,11 +108,8 @@ RegionBroker masterRb = (RegionBroker) broker.getBroker().getAdaptor( RegionBroker.class); - // REVISIT the following two are not dependable at the moment, off by a small number - // for some reason? The work for a COUNT < ~500 - // - //assertEquals("inflight match", rb.getDestinationStatistics().getInflight().getCount(), masterRb.getDestinationStatistics().getInflight().getCount()); - //assertEquals("enqueues match", rb.getDestinationStatistics().getEnqueues().getCount(), masterRb.getDestinationStatistics().getEnqueues().getCount()); + assertEquals("inflight match", rb.getDestinationStatistics().getInflight().getCount(), masterRb.getDestinationStatistics().getInflight().getCount()); + assertEquals("enqueues match", rb.getDestinationStatistics().getEnqueues().getCount(), masterRb.getDestinationStatistics().getEnqueues().getCount()); assertEquals("dequeues match", rb.getDestinationStatistics().getDequeues().getCount(), Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java?rev=753214&r1=753213&r2=753214&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java Fri Mar 13 11:59:08 2009 @@ -47,14 +47,15 @@ import org.apache.commons.logging.LogFactory; public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler { + + final static int MESSAGE_COUNT = 12120; + final static int NUM_CONSUMERS = 10; + final static int CONSUME_ALL = -1; - final static int MESSAGE_COUNT = 5120; - final static int NUM_CONSUMERS = 20; - private static final Log LOG = LogFactory.getLog(AMQ2102Test.class); - private final Map exceptions = new ConcurrentHashMap(); + private final static Map exceptions = new ConcurrentHashMap(); private class Consumer implements Runnable, ExceptionListener { private ActiveMQConnectionFactory connectionFactory; @@ -63,12 +64,14 @@ private boolean running; private org.omg.CORBA.IntHolder startup; private Thread thread; + private int numToProcessPerIteration; - Consumer(ActiveMQConnectionFactory connectionFactory, String queueName, org.omg.CORBA.IntHolder startup, int id) { + Consumer(ActiveMQConnectionFactory connectionFactory, String queueName, org.omg.CORBA.IntHolder startup, int id, int numToProcess) { this.connectionFactory = connectionFactory; this.queueName = queueName; this.startup = startup; name = "Consumer-" + queueName + "-" + id; + numToProcessPerIteration = numToProcess; thread = new Thread(this, name); } @@ -93,6 +96,7 @@ } public void onException(JMSException e) { + exceptions.put(Thread.currentThread(), e); error("JMS exception: ", e); } @@ -146,7 +150,13 @@ Session session = null; try { session = connection.createSession(true, Session.SESSION_TRANSACTED); - processMessages(session); + if (numToProcessPerIteration > 0) { + while(isRunning()) { + processMessages(session); + } + } else { + processMessages(session); + } } finally { if (session != null) { session.close(); @@ -189,7 +199,8 @@ } startup = null; } - while (isRunning()) { + int numToProcess = numToProcessPerIteration; + do { Message message = consumer.receive(5000); if (message != null) { @@ -201,7 +212,7 @@ session.rollback(); } } - } + } while ((numToProcess == CONSUME_ALL || --numToProcess > 0) && isRunning()); } public void run() { @@ -224,7 +235,7 @@ } } - private class Producer { + private class Producer implements ExceptionListener { private ActiveMQConnectionFactory connectionFactory; private String queueName; @@ -246,6 +257,7 @@ try { connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.setExceptionListener(this); connection.start(); sendMessages(connection); @@ -302,6 +314,7 @@ sendMessages(session, replyQueue, consumer); } finally { consumer.close(); + session.commit(); } } @@ -326,9 +339,8 @@ } } - private void sendMessages(Session session, Destination replyQueue, MessageConsumer consumer) throws JMSException { + private void sendMessages(final Session session, Destination replyQueue, MessageConsumer consumer) throws JMSException { final org.omg.CORBA.IntHolder messageCount = new org.omg.CORBA.IntHolder(MESSAGE_COUNT); - consumer.setMessageListener(new MessageListener() { public void onMessage(Message reply) { if (reply instanceof TextMessage) { @@ -340,6 +352,15 @@ error("Problem processing reply", e); } messageCount.value--; + if (messageCount.value % 200 == 0) { + // ack a bunch of replys + info("acking via session commit: messageCount=" + messageCount.value); + try { + session.commit(); + } catch (JMSException e) { + error("Failed to commit with count: " + messageCount.value, e); + } + } messageCount.notify(); } } else { @@ -354,11 +375,7 @@ synchronized (messageCount) { while (messageCount.value > 0) { - if (messageCount.value % 100 == 0) { - // ack a bunch of replys - debug("acking via session commit: messageCount=" + messageCount.value); - session.commit(); - } + try { messageCount.wait(); } catch (InterruptedException e) { @@ -370,12 +387,21 @@ session.commit(); debug("All replies received..."); } + + public void onException(JMSException exception) { + LOG.error(exception); + exceptions.put(Thread.currentThread(), exception); + } } private static void debug(String message) { LOG.debug(message); } + private static void info(String message) { + LOG.info(message); + } + private static void error(String message) { LOG.error(message); } @@ -384,15 +410,17 @@ t.printStackTrace(); String msg = message + ": " + (t.getMessage() != null ? t.getMessage() : t.toString()); LOG.error(msg, t); + exceptions.put(Thread.currentThread(), t); fail(msg); } - private ArrayList createConsumers(ActiveMQConnectionFactory connectionFactory, String queueName, int max) { + private ArrayList createConsumers(ActiveMQConnectionFactory connectionFactory, String queueName, + int max, int numToProcessPerConsumer) { ArrayList consumers = new ArrayList(max); org.omg.CORBA.IntHolder startup = new org.omg.CORBA.IntHolder(max); for (int id = 0; id < max; id++) { - consumers.add(new Consumer(connectionFactory, queueName, startup, id)); + consumers.add(new Consumer(connectionFactory, queueName, startup, id, numToProcessPerConsumer)); } for (Consumer consumer : consumers) { consumer.start(); @@ -445,6 +473,7 @@ public void tearDown() throws Exception { master.stop(); slave.stop(); + exceptions.clear(); } public void testMasterSlaveBug() throws Exception { @@ -453,7 +482,7 @@ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + masterUrl + ")?randomize=false"); String queueName = "MasterSlaveBug"; - ArrayList consumers = createConsumers(connectionFactory, queueName, NUM_CONSUMERS); + ArrayList consumers = createConsumers(connectionFactory, queueName, NUM_CONSUMERS, CONSUME_ALL); Producer producer = new Producer(connectionFactory, queueName); producer.execute(new String[]{}); @@ -468,10 +497,31 @@ assertTrue(exceptions.isEmpty()); } + + public void testMasterSlaveBugWithStopStartConsumers() throws Exception { + + Thread.setDefaultUncaughtExceptionHandler(this); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( + "failover:(" + masterUrl + ")?randomize=false"); + String queueName = "MasterSlaveBug"; + ArrayList consumers = createConsumers(connectionFactory, + queueName, NUM_CONSUMERS, 10); + + Producer producer = new Producer(connectionFactory, queueName); + producer.execute(new String[] {}); + + for (Consumer consumer : consumers) { + consumer.setRunning(false); + } + + for (Consumer consumer : consumers) { + consumer.join(); + } + assertTrue(exceptions.isEmpty()); + } + public void uncaughtException(Thread t, Throwable e) { error("" + t + e); exceptions.put(t,e); - - } }