activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1297252 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/bugs/
Date Mon, 05 Mar 2012 22:08:52 GMT
Author: tabish
Date: Mon Mar  5 22:08:51 2012
New Revision: 1297252

URL: http://svn.apache.org/viewvc?rev=1297252&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3732

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java 
 (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1297252&r1=1297251&r2=1297252&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Mon Mar  5 22:08:51 2012
@@ -61,7 +61,7 @@ public class DurableTopicSubscription ex
         this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
         this.keepDurableSubsActive = keepDurableSubsActive;
         subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
-        
+
     }
 
     public final boolean isActive() {
@@ -178,7 +178,7 @@ public class DurableTopicSubscription ex
                 topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
             }
         }
-        
+
         for (final MessageReference node : dispatched) {
             // Mark the dispatched messages as redelivered for next time.
             Integer count = redeliveredMessages.get(node.getMessageId());
@@ -212,10 +212,10 @@ public class DurableTopicSubscription ex
                 }
             }
         }
-        prefetchExtension = 0;
+        prefetchExtension.set(0);
     }
 
-    
+
     protected MessageDispatch createMessageDispatch(MessageReference node, Message message)
{
         MessageDispatch md = super.createMessageDispatch(node, message);
         if (node != QueueMessageReference.NULL_MESSAGE) {
@@ -243,7 +243,7 @@ public class DurableTopicSubscription ex
     public void removePending(MessageReference node) throws IOException {
         pending.remove(node);
     }
-    
+
     protected void doAddRecoveredMessage(MessageReference message) throws Exception {
         synchronized(pending) {
             pending.addRecoveredMessage(message);
@@ -272,10 +272,9 @@ public class DurableTopicSubscription ex
         node.decrementReferenceCount();
     }
 
-    
     public synchronized String toString() {
         return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId()
+ ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" +
enqueueCounter + ", pending="
-               + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight="
+ dispatched.size() + ", prefetchExtension=" + this.prefetchExtension;
+               + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight="
+ dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
     }
 
     public SubscriptionKey getSubscriptionKey() {
@@ -301,8 +300,7 @@ public class DurableTopicSubscription ex
             }
         }
         synchronized(dispatched) {
-            for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
-                MessageReference node = (MessageReference) iter.next();
+            for (MessageReference node : dispatched) {
                 node.decrementReferenceCount();
             }
             dispatched.clear();
@@ -319,7 +317,7 @@ public class DurableTopicSubscription ex
             }
         }
     }
-    
+
     protected boolean isDropped(MessageReference node) {
        return false;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1297252&r1=1297251&r2=1297252&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon Mar  5 22:08:51 2012
@@ -23,8 +23,11 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -47,17 +50,15 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A subscription that honors the pre-fetch option of the ConsumerInfo.
- * 
- * 
  */
 public abstract class PrefetchSubscription extends AbstractSubscription {
 
     private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class);
     protected final Scheduler scheduler;
-    
+
     protected PendingMessageCursor pending;
     protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
-    protected int prefetchExtension;
+    protected final AtomicInteger prefetchExtension = new AtomicInteger();
     protected boolean usePrefetchExtension = true;
     protected long enqueueCounter;
     protected long dispatchCounter;
@@ -68,7 +69,7 @@ public abstract class PrefetchSubscripti
     protected final Object pendingLock = new Object();
     private final Object dispatchLock = new Object();
     private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
-    
+
     public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext
context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
         super(broker,context, info);
         this.usageManager=usageManager;
@@ -84,42 +85,38 @@ public abstract class PrefetchSubscripti
      * Allows a message to be pulled on demand by a client
      */
     public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception
{
-        // The slave should not deliver pull messages. TODO: when the slave
-        // becomes a master,
-        // He should send a NULL message to all the consumers to 'wake them up'
-        // in case
-        // they were waiting for a message.
+        // The slave should not deliver pull messages.
+        // TODO: when the slave becomes a master, He should send a NULL message to all the
+        // consumers to 'wake them up' in case they were waiting for a message.
         if (getPrefetchSize() == 0 && !isSlave()) {
-            final long dispatchCounterBeforePull;
-        	synchronized(this) {
-        		prefetchExtension++;
-        		dispatchCounterBeforePull = dispatchCounter;
-        	}
-            
-        	// Have the destination push us some messages.
-        	for (Destination dest : destinations) {
-				dest.iterate();
-			}
-        	dispatchPending();
-            
+
+            prefetchExtension.incrementAndGet();
+            final long dispatchCounterBeforePull = dispatchCounter;
+
+            // Have the destination push us some messages.
+            for (Destination dest : destinations) {
+                dest.iterate();
+            }
+            dispatchPending();
+
             synchronized(this) {
-	            // If there was nothing dispatched.. we may need to setup a timeout.
-	            if (dispatchCounterBeforePull == dispatchCounter) {
-	                // immediate timeout used by receiveNoWait()
-	                if (pull.getTimeout() == -1) {
-	                    // Send a NULL message.
-	                    add(QueueMessageReference.NULL_MESSAGE);
-	                    dispatchPending();
-	                }
-	                if (pull.getTimeout() > 0) {
-	                    scheduler.executeAfterDelay(new Runnable() {
-	
-	                        public void run() {
-	                            pullTimeout(dispatchCounterBeforePull);
-	                        }
-	                    }, pull.getTimeout());
-	                }
-	            }
+                // If there was nothing dispatched.. we may need to setup a timeout.
+                if (dispatchCounterBeforePull == dispatchCounter) {
+                    // immediate timeout used by receiveNoWait()
+                    if (pull.getTimeout() == -1) {
+                        // Send a NULL message.
+                        add(QueueMessageReference.NULL_MESSAGE);
+                        dispatchPending();
+                    }
+                    if (pull.getTimeout() > 0) {
+                        scheduler.executeAfterDelay(new Runnable() {
+                            @Override
+                            public void run() {
+                                pullTimeout(dispatchCounterBeforePull);
+                            }
+                        }, pull.getTimeout());
+                    }
+                }
             }
         }
         return null;
@@ -130,8 +127,8 @@ public abstract class PrefetchSubscripti
      * timeout was setup, then send the NULL message.
      */
     final void pullTimeout(long dispatchCounterBeforePull) {
-    	synchronized (pendingLock) {
-    		if (dispatchCounterBeforePull == dispatchCounter) {
+        synchronized (pendingLock) {
+            if (dispatchCounterBeforePull == dispatchCounter) {
                 try {
                     add(QueueMessageReference.NULL_MESSAGE);
                     dispatchPending();
@@ -144,13 +141,13 @@ public abstract class PrefetchSubscripti
 
     public void add(MessageReference node) throws Exception {
         synchronized (pendingLock) {
-            // The destination may have just been removed...  
+            // The destination may have just been removed...
             if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE)
{
                 // perhaps we should inform the caller that we are no longer valid to dispatch
to?
                 return;
             }
             enqueueCounter++;
-            pending.addMessageLast(node);    
+            pending.addMessageLast(node);
         }
         dispatchPending();
     }
@@ -188,7 +185,7 @@ public abstract class PrefetchSubscripti
         // Handle the standard acknowledgment case.
         boolean callDispatchMatched = false;
         Destination destination = null;
-        
+
         if (!isSlave()) {
             if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
                 // suppress unexpected ack exception in this expected case
@@ -201,10 +198,10 @@ public abstract class PrefetchSubscripti
         }
         synchronized(dispatchLock) {
             if (ack.isStandardAck()) {
-            	// First check if the ack matches the dispatched. When using failover this might
-            	// not be the case. We don't ever want to ack the wrong messages.
-            	assertAckMatchesDispatched(ack);
-            	
+                // First check if the ack matches the dispatched. When using failover this
might
+                // not be the case. We don't ever want to ack the wrong messages.
+                assertAckMatchesDispatched(ack);
+
                 // Acknowledge all dispatched messages up till the message id of
                 // the acknowledgment.
                 int index = 0;
@@ -217,7 +214,7 @@ public abstract class PrefetchSubscripti
                         inAckRange = true;
                     }
                     if (inAckRange) {
-                        // Don't remove the nodes until we are committed.  
+                        // Don't remove the nodes until we are committed.
                         if (!context.isInTransaction()) {
                             dequeueCounter++;
                             node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
@@ -227,13 +224,26 @@ public abstract class PrefetchSubscripti
                         }
                         index++;
                         acknowledge(context, ack, node);
-                        if (ack.getLastMessageId().equals(messageId)) {                 

+                        if (ack.getLastMessageId().equals(messageId)) {
                             // contract prefetch if dispatch required a pull
                             if (getPrefetchSize() == 0) {
-                                prefetchExtension = Math.max(0, prefetchExtension - index);
+                                // Protect extension update against parallel updates.
+                                while (true) {
+                                    int currentExtension = prefetchExtension.get();
+                                    int newExtension = Math.max(0, currentExtension - index);
+                                    if (prefetchExtension.compareAndSet(currentExtension,
newExtension)) {
+                                        break;
+                                    }
+                                }
                             } else if (usePrefetchExtension && context.isInTransaction())
{
                                 // extend prefetch window only if not a pulling consumer
-                                prefetchExtension = Math.max(prefetchExtension, index);
+                                while (true) {
+                                    int currentExtension = prefetchExtension.get();
+                                    int newExtension = Math.max(currentExtension, index);
+                                    if (prefetchExtension.compareAndSet(currentExtension,
newExtension)) {
+                                        break;
+                                    }
+                                }
                             }
                             destination = node.getRegionDestination();
                             callDispatchMatched = true;
@@ -264,7 +274,15 @@ public abstract class PrefetchSubscripti
                         } else {
                             registerRemoveSync(context, node);
                         }
-                        prefetchExtension = Math.max(0, prefetchExtension - 1);
+
+                        // Protect extension update against parallel updates.
+                        while (true) {
+                            int currentExtension = prefetchExtension.get();
+                            int newExtension = Math.max(0, currentExtension - 1);
+                            if (prefetchExtension.compareAndSet(currentExtension, newExtension))
{
+                                break;
+                            }
+                        }
                         acknowledge(context, ack, node);
                         destination = node.getRegionDestination();
                         callDispatchMatched = true;
@@ -286,7 +304,13 @@ public abstract class PrefetchSubscripti
                     }
                     if (ack.getLastMessageId().equals(node.getMessageId())) {
                         if (usePrefetchExtension) {
-                            prefetchExtension = Math.max(prefetchExtension, index + 1);
+                            while (true) {
+                                int currentExtension = prefetchExtension.get();
+                                int newExtension = Math.max(currentExtension, index + 1);
+                                if (prefetchExtension.compareAndSet(currentExtension, newExtension))
{
+                                    break;
+                                }
+                            }
                         }
                         destination = node.getRegionDestination();
                         callDispatchMatched = true;
@@ -351,8 +375,13 @@ public abstract class PrefetchSubscripti
                         index++;
                         acknowledge(context, ack, node);
                         if (ack.getLastMessageId().equals(messageId)) {
-                            prefetchExtension = Math.max(0, prefetchExtension
-                                    - (index + 1));
+                            while (true) {
+                                int currentExtension = prefetchExtension.get();
+                                int newExtension = Math.max(0, currentExtension - (index
+ 1));
+                                if (prefetchExtension.compareAndSet(currentExtension, newExtension))
{
+                                    break;
+                                }
+                            }
                             destination = node.getRegionDestination();
                             callDispatchMatched = true;
                             break;
@@ -369,7 +398,7 @@ public abstract class PrefetchSubscripti
                 }
             }
         }
-        if (callDispatchMatched && destination != null) {    
+        if (callDispatchMatched && destination != null) {
             destination.wakeup();
             dispatchPending();
         } else {
@@ -416,11 +445,11 @@ public abstract class PrefetchSubscripti
 
     /**
      * Checks an ack versus the contents of the dispatched list.
-     * 
+     *
      * @param ack
      * @throws JMSException if it does not match
      */
-	protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
+    protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
         MessageId firstAckedMsg = ack.getFirstMessageId();
         MessageId lastAckedMsg = ack.getLastMessageId();
         int checkCount = 0;
@@ -468,37 +497,37 @@ public abstract class PrefetchSubscripti
     protected void sendToDLQ(final ConnectionContext context, final MessageReference node)
throws IOException, Exception {
         broker.getRoot().sendToDeadLetterQueue(context, node, this);
     }
-    
+
     public int getInFlightSize() {
         return dispatched.size();
     }
-    
+
     /**
      * Used to determine if the broker can dispatch to the consumer.
-     * 
+     *
      * @return
      */
     public boolean isFull() {
-        return dispatched.size() - prefetchExtension >= info.getPrefetchSize();
+        return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
     }
 
     /**
      * @return true when 60% or more room is left for dispatching messages
      */
     public boolean isLowWaterMark() {
-        return (dispatched.size() - prefetchExtension) <= (info.getPrefetchSize() * .4);
+        return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize()
* .4);
     }
 
     /**
      * @return true when 10% or less room is left for dispatching messages
      */
     public boolean isHighWaterMark() {
-        return (dispatched.size() - prefetchExtension) >= (info.getPrefetchSize() * .9);
+        return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize()
* .9);
     }
 
     @Override
     public int countBeforeFull() {
-        return info.getPrefetchSize() + prefetchExtension - dispatched.size();
+        return info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
     }
 
     public int getPendingQueueSize() {
@@ -559,16 +588,16 @@ public abstract class PrefetchSubscripti
             // Synchronized to DispatchLock
             synchronized(dispatchLock) {
                 ArrayList<MessageReference> references = new ArrayList<MessageReference>();
-	            for (MessageReference r : dispatched) {
-	                if( r.getRegionDestination() == destination) {
+                for (MessageReference r : dispatched) {
+                    if( r.getRegionDestination() == destination) {
                         references.add(r);
-	                }
-	            }
+                    }
+                }
                 rc.addAll(references);
                 destination.getDestinationStatistics().getDispatched().subtract(references.size());
                 destination.getDestinationStatistics().getInflight().subtract(references.size());
                 dispatched.removeAll(references);
-            }            
+            }
         }
         return rc;
     }
@@ -589,7 +618,7 @@ public abstract class PrefetchSubscripti
                             if (node == null) {
                                 break;
                             }
-                            
+
                             // Synchronize between dispatched list and remove of message
from pending list
                             // related to remove subscription action
                             synchronized(dispatchLock) {
@@ -634,9 +663,9 @@ public abstract class PrefetchSubscripti
         if (message == null) {
             return false;
         }
-        
+
         okForAckAsDispatchDone.countDown();
-        
+
         // No reentrant lock - Patch needed to IndirectMessageReference on method lock
         if (!isSlave()) {
 
@@ -646,7 +675,13 @@ public abstract class PrefetchSubscripti
                 dispatchCounter++;
                 dispatched.add(node);
             } else {
-                prefetchExtension = Math.max(0, prefetchExtension - 1);
+                while (true) {
+                    int currentExtension = prefetchExtension.get();
+                    int newExtension = Math.max(0, currentExtension - 1);
+                    if (prefetchExtension.compareAndSet(currentExtension, newExtension))
{
+                        break;
+                    }
+                }
             }
             if (info.isDispatchAsync()) {
                 md.setTransmitCallback(new Runnable() {
@@ -674,14 +709,14 @@ public abstract class PrefetchSubscripti
         if (node.getRegionDestination() != null) {
             if (node != QueueMessageReference.NULL_MESSAGE) {
                 node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
-                node.getRegionDestination().getDestinationStatistics().getInflight().increment();
  
+                node.getRegionDestination().getDestinationStatistics().getInflight().increment();
                 if (LOG.isTraceEnabled()) {
                     LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId()
+ " - "
                             + message.getDestination()  + ", dispatched: " + dispatchCounter
+ ", inflight: " + dispatched.size());
                 }
             }
         }
-        
+
         if (info.isDispatchAsync()) {
             try {
                 dispatchPending();
@@ -693,7 +728,7 @@ public abstract class PrefetchSubscripti
 
     /**
      * inform the MessageConsumer on the client to change it's prefetch
-     * 
+     *
      * @param newPrefetch
      */
     public void updateConsumerPrefetch(int newPrefetch) {
@@ -711,42 +746,41 @@ public abstract class PrefetchSubscripti
      * @return MessageDispatch
      */
     protected MessageDispatch createMessageDispatch(MessageReference node, Message message)
{
+        MessageDispatch md = new MessageDispatch();
+        md.setConsumerId(info.getConsumerId());
+
         if (node == QueueMessageReference.NULL_MESSAGE) {
-            MessageDispatch md = new MessageDispatch();
             md.setMessage(null);
-            md.setConsumerId(info.getConsumerId());
             md.setDestination(null);
-            return md;
         } else {
-            MessageDispatch md = new MessageDispatch();
-            md.setConsumerId(info.getConsumerId());
             md.setDestination(node.getRegionDestination().getActiveMQDestination());
             md.setMessage(message);
             md.setRedeliveryCounter(node.getRedeliveryCounter());
-            return md;
         }
+
+        return md;
     }
 
     /**
      * Use when a matched message is about to be dispatched to the client.
-     * 
+     *
      * @param node
      * @return false if the message should not be dispatched to the client
      *         (another sub may have already dispatched it for example).
      * @throws IOException
      */
     protected abstract boolean canDispatch(MessageReference node) throws IOException;
-    
+
     protected abstract boolean isDropped(MessageReference node);
 
     /**
      * Used during acknowledgment to remove the message.
-     * 
+     *
      * @throws IOException
      */
     protected abstract void acknowledge(ConnectionContext context, final MessageAck ack,
final MessageReference node) throws IOException;
 
-    
+
     public int getMaxProducersToAudit() {
         return maxProducersToAudit;
     }
@@ -762,7 +796,7 @@ public abstract class PrefetchSubscripti
     public void setMaxAuditDepth(int maxAuditDepth) {
         this.maxAuditDepth = maxAuditDepth;
     }
-    
+
     public boolean isUsePrefetchExtension() {
         return usePrefetchExtension;
     }
@@ -770,4 +804,8 @@ public abstract class PrefetchSubscripti
     public void setUsePrefetchExtension(boolean usePrefetchExtension) {
         this.usePrefetchExtension = usePrefetchExtension;
     }
+
+    protected int getPrefetchExtension() {
+        return this.prefetchExtension.get();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1297252&r1=1297251&r2=1297252&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Mar  5 22:08:51 2012
@@ -60,7 +60,17 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
 import org.apache.activemq.broker.util.InsertionCountList;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java?rev=1297252&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java Mon
Mar  5 22:08:51 2012
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ3732Test {
+
+    private static Logger LOG = LoggerFactory.getLogger(AMQ3529Test.class);
+
+    private ActiveMQConnectionFactory connectionFactory;
+    private Connection connection;
+    private Session session;
+    private BrokerService broker;
+    private String connectionUri;
+
+    private final Random pause = new Random();
+    private final long NUM_MESSAGES = 25000;
+    private final AtomicLong totalConsumed = new AtomicLong();
+
+    @Before
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://0.0.0.0:0");
+        broker.start();
+        broker.waitUntilStarted();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+
+        connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+        connectionFactory.getPrefetchPolicy().setAll(0);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        connection.close();
+
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    @Test(timeout = 1200000)
+    public void testInterruptionAffects() throws Exception {
+
+        connection = connectionFactory.createConnection();
+        connection.start();
+        session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+        Queue queue = session.createQueue("AMQ3732Test");
+
+        final LinkedBlockingQueue<Message> workQueue = new LinkedBlockingQueue<Message>();
+
+        final MessageConsumer consumer1 = session.createConsumer(queue);
+        final MessageConsumer consumer2 = session.createConsumer(queue);
+        final MessageProducer producer = session.createProducer(queue);
+
+        Thread consumer1Thread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    while (totalConsumed.get() < NUM_MESSAGES) {
+                        Message message = consumer1.receiveNoWait();
+                        if (message != null) {
+                            workQueue.add(message);
+                        }
+                    }
+                } catch(Exception e) {
+                    LOG.error("Caught an unexpected error: ", e);
+                }
+            }
+        });
+        consumer1Thread.start();
+
+        Thread consumer2Thread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    while (totalConsumed.get() < NUM_MESSAGES) {
+                        Message message = consumer2.receive(50);
+                        if (message != null) {
+                            workQueue.add(message);
+                        }
+                    }
+                } catch(Exception e) {
+                    LOG.error("Caught an unexpected error: ", e);
+                }
+            }
+        });
+        consumer2Thread.start();
+
+        Thread producerThread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    for (int i = 0; i < NUM_MESSAGES; ++i) {
+                        producer.send(session.createTextMessage("TEST"));
+                        TimeUnit.MILLISECONDS.sleep(pause.nextInt(10));
+                    }
+                } catch(Exception e) {
+                    LOG.error("Caught an unexpected error: ", e);
+                }
+            }
+        });
+        producerThread.start();
+
+        Thread ackingThread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    while (totalConsumed.get() < NUM_MESSAGES) {
+                        Message message = workQueue.take();
+                        message.acknowledge();
+                        totalConsumed.incrementAndGet();
+                        if ((totalConsumed.get() % 100) == 0) {
+                            LOG.info("Consumed " + totalConsumed.get() + " messages so far.");
+                        }
+                    }
+                } catch(Exception e) {
+                    LOG.error("Caught an unexpected error: ", e);
+                }
+            }
+        });
+        ackingThread.start();
+
+        producerThread.join();
+        consumer1Thread.join();
+        consumer2Thread.join();
+        ackingThread.join();
+
+        assertEquals(NUM_MESSAGES, totalConsumed.get());
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message