activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r901273 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ test/java/org/apache/activemq/transport/failover/ test/resources/
Date Wed, 20 Jan 2010 16:49:23 GMT
Author: gtully
Date: Wed Jan 20 16:49:22 2010
New Revision: 901273

URL: http://svn.apache.org/viewvc?rev=901273&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2573 - rollback of audit check needs
to be synced with redispatch after failover transport resumption, otherwise some redispatched
unconsumed messages can get auto-acked as duplicates in error

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
    activemq/trunk/activemq-core/src/test/resources/log4j.properties

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=901273&r1=901272&r2=901273&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Wed Jan 20 16:49:22 2010
@@ -187,6 +187,7 @@
     private DestinationSource destinationSource;
     private final Object ensureConnectionInfoSentMutex = new Object();
     private boolean useDedicatedTaskRunner;
+    protected CountDownLatch transportInterruptionProcessingComplete;
 
     /**
      * Construct an <code>ActiveMQConnection</code>
@@ -1674,6 +1675,7 @@
                 command.visit(new CommandVisitorAdapter() {
                     @Override
                     public Response processMessageDispatch(MessageDispatch md) throws Exception
{
+                        waitForTransportInterruptionProcessing();
                         ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
                         if (dispatcher != null) {
                             // Copy in case a embedded broker is dispatching via
@@ -1837,6 +1839,10 @@
 	}
 
     public void transportInterupted() {
+        transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() -
(advisoryConsumer != null ? 1:0));
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
+        }
         for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();)
{
             ActiveMQSession s = i.next();
             s.clearMessagesInProgress();
@@ -2235,4 +2241,21 @@
 	public IOException getFirstFailureError() {
 		return firstFailureError;
 	}
+	
+	protected void waitForTransportInterruptionProcessing() throws InterruptedException {
+        if (transportInterruptionProcessingComplete != null) {
+            while (!closed.get() && !transportFailed.get() && !transportInterruptionProcessingComplete.await(15,
TimeUnit.SECONDS)) {
+                LOG.warn("dispatch paused, waiting for outstanding dispatch interruption
processing (" + transportInterruptionProcessingComplete.getCount() + ") to complete..");
+            }
+            synchronized (this) {
+                transportInterruptionProcessingComplete = null;
+            }
+        }
+    }
+	
+	protected synchronized void transportInterruptionProcessingComplete() {
+	    if (transportInterruptionProcessingComplete != null) {
+	        transportInterruptionProcessingComplete.countDown();
+	    }
+	}
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=901273&r1=901272&r2=901273&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Wed Jan 20 16:49:22 2010
@@ -103,7 +103,7 @@
     protected final ConsumerInfo info;
 
     // These are the messages waiting to be delivered to the client
-    private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
+    protected final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
 
     // The are the messages that were delivered to the consumer but that have
     // not been acknowledged. It's kept in reverse order since we
@@ -640,14 +640,22 @@
     }
     
     void clearMessagesInProgress() {
-        // we are called from inside the transport reconnection logic
-        // which involves us clearing all the connections' consumers
-        // dispatch lists and clearing them
-        // so rather than trying to grab a mutex (which could be already
-        // owned by the message listener calling the send) we will just set
-        // a flag so that the list can be cleared as soon as the
-        // dispatch thread is ready to flush the dispatch list
+        // deal with delivered messages async to avoid lock contention with in pogress acks
         clearDispatchList = true;
+        synchronized (unconsumedMessages.getMutex()) {            
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(getConsumerId() + " clearing dispatched list (" + unconsumedMessages.size()
+ ") on transport interrupt");
+            }
+            // ensure unconsumed are rolledback up front as they may get redelivered to another
consumer
+            List<MessageDispatch> list = unconsumedMessages.removeAll();
+            if (!this.info.isBrowser()) {
+                for (MessageDispatch old : list) {
+                    session.connection.rollbackDuplicate(this, old.getMessage());
+                }
+            }
+        }
+        // allow dispatch on this connection to resume
+        session.connection.transportInterruptionProcessingComplete();
     }
 
     void deliverAcks() {
@@ -755,9 +763,7 @@
      * broker to pull a message we are about to receive
      */
     protected void sendPullCommand(long timeout) throws JMSException {
-        synchronized (unconsumedMessages.getMutex()) {
-            clearDispatchListOnReconnect();
-        }
+        clearDispatchList();
         if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
             MessagePull messagePull = new MessagePull();
             messagePull.configure(info);
@@ -937,9 +943,7 @@
      * @throws JMSException
      */
     public void acknowledge() throws JMSException {
-        synchronized (unconsumedMessages.getMutex()) {
-            clearDispatchListOnReconnect();
-        }
+        clearDispatchList();
         synchronized(deliveredMessages) {
             // Acknowledge all messages so far.
             MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
@@ -1072,8 +1076,8 @@
     public void dispatch(MessageDispatch md) {
         MessageListener listener = this.messageListener.get();
         try {
+            clearDispatchList();
             synchronized (unconsumedMessages.getMutex()) {
-                clearDispatchListOnReconnect();
                 if (!unconsumedMessages.isClosed()) {
                     if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage()))
{
                         if (listener != null && unconsumedMessages.isRunning()) {
@@ -1119,25 +1123,19 @@
         }
     }
 
-    // called holding unconsumedMessages.getMutex()
-    private void clearDispatchListOnReconnect() {
+    // async (on next call) clear delivered as they will be auto-acked as duplicates if they
arrive again
+    private void clearDispatchList() {
         if (clearDispatchList) {
-            // we are reconnecting so lets flush the in progress
-            // messages
-            clearDispatchList = false;
-            List<MessageDispatch> list = unconsumedMessages.removeAll();
-            if (!this.info.isBrowser()) {
-                for (MessageDispatch old : list) {
-                    // ensure we don't filter this as a duplicate
-                    session.connection.rollbackDuplicate(this, old.getMessage());
-                }
-            }
-           
-            // clean, so we don't have duplicates with optimizeAcknowledge 
             synchronized (deliveredMessages) {
-                deliveredMessages.clear();        
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(getConsumerId() + " async clearing delivered list (" + deliveredMessages.size()
+ ") on transport interrupt");
+                }
+                if (clearDispatchList) {
+                    deliveredMessages.clear();
+                    pendingAck = null;
+                    clearDispatchList = false;
+                }
             }
-            pendingAck = null;
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=901273&r1=901272&r2=901273&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Wed
Jan 20 16:49:22 2010
@@ -16,9 +16,71 @@
  */
 package org.apache.activemq;
 
+import java.io.File;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
+
+import org.apache.activemq.blob.BlobDownloader;
 import org.apache.activemq.blob.BlobTransferPolicy;
 import org.apache.activemq.blob.BlobUploader;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.management.JMSSessionStatsImpl;
 import org.apache.activemq.management.StatsCapable;
 import org.apache.activemq.management.StatsImpl;
@@ -30,20 +92,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import javax.jms.Message;
-import java.io.File;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.net.URL;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.activemq.blob.BlobDownloader;
-
 /**
  * <P>
  * A <CODE>Session</CODE> object is a single-threaded context for producing
@@ -591,10 +639,20 @@
     }
 
     void clearMessagesInProgress() {
-        executor.clearMessagesInProgress();
-        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();)
{
-            ActiveMQMessageConsumer consumer = iter.next();
-            consumer.clearMessagesInProgress();
+        executor.clearMessagesInProgress();        
+        // we are called from inside the transport reconnection logic
+        // which involves us clearing all the connections' consumers
+        // dispatch and delivered lists. So rather than trying to 
+        // grab a mutex (which could be already owned by the message 
+        // listener calling the send or an ack) we allow it to complete in 
+        // a separate thread via the scheduler and notify us via 
+        // connection.transportInterruptionProcessingComplete()
+        //
+        for (final ActiveMQMessageConsumer consumer : consumers) {
+            scheduler.executeAfterDelay(new Runnable() {
+                public void run() {
+                    consumer.clearMessagesInProgress();
+                }}, 0l);
         }
     }
 

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java?rev=901273&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
Wed Jan 20 16:49:22 2010
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQMessageTransformation;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.Test;
+
+// see https://issues.apache.org/activemq/browse/AMQ-2573
+public class FailoverConsumerUnconsumedTest {
+	
+    private static final Log LOG = LogFactory.getLog(FailoverConsumerUnconsumedTest.class);
+	private static final String QUEUE_NAME = "FailoverWithUnconsumed";
+	private String url = "tcp://localhost:61616";
+	final int prefetch = 10;
+	BrokerService broker;
+	
+	public void startCleanBroker() throws Exception {
+	    startBroker(true);
+	}
+	
+	@After
+	public void stopBroker() throws Exception {
+	    if (broker != null) {
+	        broker.stop();
+	    }
+	}
+	
+	public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
+	    broker = createBroker(deleteAllMessagesOnStartup);
+        broker.start();
+	}
+
+	public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
  
+	    broker = new BrokerService();
+	    broker.addConnector(url);
+	    broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
+	    return broker;
+	}
+
+	@Test
+	public void testFailoverConsumerDups() throws Exception {
+	    doTestFailoverConsumerDups(true);
+	}
+	 
+	@Test
+    public void testFailoverConsumerDupsNoAdvisoryWatch() throws Exception {
+        doTestFailoverConsumerDups(false);
+    }
+	
+	public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception
{
+	    
+	    final int maxConsumers = 4;
+        broker = createBroker(true);
+            
+        broker.setPlugins(new BrokerPlugin[] {
+                new BrokerPluginSupport() {
+                    int consumerCount;
+
+                    // broker is killed on x create consumer
+                    @Override
+                    public Subscription addConsumer(ConnectionContext context,
+                            final ConsumerInfo info) throws Exception {
+                         if (++consumerCount == maxConsumers + (watchTopicAdvisories ? 1:0))
{
+                             context.setDontSendReponse(true);
+                             Executors.newSingleThreadExecutor().execute(new Runnable() {
  
+                                 public void run() {
+                                     LOG.info("Stopping broker on consumer: " + info.getConsumerId());
+                                     try {
+                                         broker.stop();
+                                     } catch (Exception e) {
+                                         e.printStackTrace();
+                                     }
+                                 }
+                             });
+                         }
+                        return super.addConsumer(context, info);
+                    }
+                }
+        });
+        broker.start();
+        
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url +
")");
+        cf.setWatchTopicAdvisories(watchTopicAdvisories);
+        
+        final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+        connection.start();
+        
+        final Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue destination = consumerSession.createQueue(QUEUE_NAME + "?jms.consumer.prefetch="
+ prefetch);
+
+        final Vector<TestConsumer> testConsumers = new Vector<TestConsumer>();
+        for (int i=0; i<maxConsumers -1; i++) {
+            testConsumers.add(new TestConsumer(consumerSession, destination, connection));
+        }
+        
+        produceMessage(consumerSession, destination, maxConsumers * prefetch);
+               
+        assertTrue("add messages are dispatched", Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                int totalUnconsumed = 0;
+                for (TestConsumer testConsumer : testConsumers) {
+                    long unconsumed = testConsumer.unconsumedSize();
+                    LOG.info(testConsumer.getConsumerId() + " unconsumed: " + unconsumed);
+                    totalUnconsumed += unconsumed;
+                }   
+                return totalUnconsumed == (maxConsumers-1) * prefetch;
+            }
+        }));
+        
+        final CountDownLatch commitDoneLatch = new CountDownLatch(1);
+        
+        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+            public void run() {
+                try {
+                    LOG.info("add last consumer...");
+                    testConsumers.add(new TestConsumer(consumerSession, destination, connection));
+                    commitDoneLatch.countDown();
+                    LOG.info("done add last consumer");
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+               
+        // will be stopped by the plugin
+        broker.waitUntilStopped();
+        broker = createBroker(false);
+        broker.start();
+
+        assertTrue("consumer added through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+        
+        // each should again get prefetch messages - all unconsumed deliveries should be
rolledback
+        assertTrue("after start all messages are re dispatched", Wait.waitFor(new Wait.Condition()
{
+            public boolean isSatisified() throws Exception {
+                int totalUnconsumed = 0;
+                for (TestConsumer testConsumer : testConsumers) {
+                    long unconsumed = testConsumer.unconsumedSize();
+                    LOG.info(testConsumer.getConsumerId() + " after restart: unconsumed:
" + unconsumed);
+                    totalUnconsumed += unconsumed;
+                }   
+                return totalUnconsumed == (maxConsumers) * prefetch;
+            }
+        }));
+        
+        connection.close();
+    }
+        
+    private void produceMessage(final Session producerSession, Queue destination, long count)
+        throws JMSException {
+        MessageProducer producer = producerSession.createProducer(destination);
+        for (int i=0; i<count; i++) {
+            TextMessage message = producerSession.createTextMessage("Test message " + i);
+            producer.send(message);
+        }
+        producer.close();
+    }
+    
+    // allow access to unconsumedMessages
+    class TestConsumer extends ActiveMQMessageConsumer {
+        
+        TestConsumer(Session consumerSession, Destination destination, ActiveMQConnection
connection) throws Exception {
+            super((ActiveMQSession) consumerSession, 
+                new ConsumerId(new SessionId(connection.getConnectionInfo().getConnectionId(),1),
nextGen()), 
+                ActiveMQMessageTransformation.transformDestination(destination), null, "",
+                prefetch, -1, false, false, true, null);
+        }
+    
+        public int unconsumedSize() {
+            return unconsumedMessages.size();
+        }
+    }
+    
+    static long idGen = 100;
+    private static long nextGen() {
+        idGen -=5;
+        return idGen;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=901273&r1=901272&r2=901273&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Wed Jan 20 16:49:22 2010
@@ -390,7 +390,7 @@
     @Test
     public void testFailoverConsumerAckLost() throws Exception {
         // as failure depends on hash order, do a few times
-        for (int i=0; i<4; i++) {
+        for (int i=0; i<3; i++) {
             try {
                 doTestFailoverConsumerAckLost();
             } finally {

Modified: activemq/trunk/activemq-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/log4j.properties?rev=901273&r1=901272&r2=901273&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/log4j.properties (original)
+++ activemq/trunk/activemq-core/src/test/resources/log4j.properties Wed Jan 20 16:49:22 2010
@@ -22,9 +22,6 @@
 
 #log4j.logger.org.apache.activemq=DEBUG
 
-# get to the bottom of jmx related intermittent failures
-log4j.logger.org.apache.activemq.broker.jmx.ManagementContext=DEBUG
-
 # CONSOLE appender not used by default
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout



Mime
View raw message