activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r705281 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/network/ test/java/org/apache/activemq/usecases/
Date Thu, 16 Oct 2008 16:48:53 GMT
Author: gtully
Date: Thu Oct 16 09:48:52 2008
New Revision: 705281

URL: http://svn.apache.org/viewvc?rev=705281&view=rev
Log:
AMQ-1976, individual messages passing through a network bridge need an individual ack; for
an individual ack, it is ok to have more than one message dispatched

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=705281&r1=705280&r2=705281&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Thu Oct 16 09:48:52 2008
@@ -411,40 +411,44 @@
      * @param lastAckedMsg
      * @throws JMSException if it does not match
      */
-	protected void assertAckMatchesDispatched(MessageAck ack)
-			throws JMSException {
+	protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
         MessageId firstAckedMsg = ack.getFirstMessageId();
-		MessageId lastAckedMsg = ack.getLastMessageId();
+        MessageId lastAckedMsg = ack.getLastMessageId();
+        int checkCount = 0;
+        boolean checkFoundStart = false;
+        boolean checkFoundEnd = false;
+        for (MessageReference node : dispatched) {
+
+            if (firstAckedMsg == null) {
+                checkFoundStart = true;
+            } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId()))
{
+                checkFoundStart = true;
+            }
+
+            if (checkFoundStart) {
+                checkCount++;
+            }
 
-		int checkCount = 0;
-		boolean checkFoundStart = false;
-		boolean checkFoundEnd = false;
-		for (MessageReference node : dispatched) {
-			
-			if( firstAckedMsg == null ) {
-				checkFoundStart=true;
-			} else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
-				checkFoundStart = true;
-			}
-
-			if (checkFoundStart) {
-				checkCount++;
-			}
-
-			if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
-				checkFoundEnd = true;
-				break;
-			}
-		}
-		if (!checkFoundStart && firstAckedMsg != null)
-			throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+firstAckedMsg+"
in dispatched-list (start of ack)");
-		if (!checkFoundEnd && lastAckedMsg != null)
-			throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+lastAckedMsg+"
in dispatched-list (end of ack)");
-		if (ack.getMessageCount() != checkCount) {
-			throw new JMSException("Unmatched acknowledege: Expected message count ("+ack.getMessageCount()+
-					") differs from count in dispatched-list ("+checkCount+")");
-		}
-	}
+            if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId()))
{
+                checkFoundEnd = true;
+                break;
+            }
+        }
+        if (!checkFoundStart && firstAckedMsg != null)
+            throw new JMSException("Unmatched acknowledege: " + ack
+                    + "; Could not find Message-ID " + firstAckedMsg
+                    + " in dispatched-list (start of ack)");
+        if (!checkFoundEnd && lastAckedMsg != null)
+            throw new JMSException("Unmatched acknowledege: " + ack
+                    + "; Could not find Message-ID " + lastAckedMsg
+                    + " in dispatched-list (end of ack)");
+        if (ack.getMessageCount() != checkCount && ack.isStandardAck()) {
+            throw new JMSException("Unmatched acknowledege: " + ack
+                    + "; Expected message count (" + ack.getMessageCount()
+                    + ") differs from count in dispatched-list (" + checkCount
+                    + ")");
+        }
+    }
 
     /**
      * @param context

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=705281&r1=705280&r2=705281&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Thu Oct 16 09:48:52 2008
@@ -647,7 +647,7 @@
                             else{
                               LOG.info("Message not forwarded on to remote, because message
came from remote");                               
                             }
-                            localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE,
1));
+                            localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE,
1));
                             dequeueCounter.incrementAndGet();                          
 
                         } else {
@@ -664,7 +664,7 @@
                                             ExceptionResponse er = (ExceptionResponse)response;
                                             serviceLocalException(er.getException());
                                         } else {
-                                            localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE,
1));
+                                            localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE,
1));
                                             dequeueCounter.incrementAndGet();
                                         }
                                     } catch (IOException e) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java?rev=705281&r1=705280&r2=705281&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java
Thu Oct 16 09:48:52 2008
@@ -39,7 +39,6 @@
     public static final int MESSAGE_COUNT = 2000;
    
     public void testQueueNetworkWithConsumerFull() throws Exception {
-        if (true) return;
         
         bridgeAllBrokers();
         startAllBrokers();
@@ -67,8 +66,6 @@
         assertTrue("All messages are consumed and acked from source:" + internalQueue, internalQueue.getMessages().isEmpty());
         assertEquals("messages source:" + internalQueue, 0, internalQueue.getDestinationStatistics().getMessages().getCount());
         assertEquals("inflight source:" + internalQueue, 0, internalQueue.getDestinationStatistics().getInflight().getCount());
-        
-
     }
 
     public void setUp() throws Exception {
@@ -82,7 +79,6 @@
         }
         BrokerService broker2 = brokers.get("Broker2").broker;
         applyMemoryLimitPolicy(broker2);
-        
     }
 
     private void applyMemoryLimitPolicy(BrokerService broker) {



Mime
View raw message