Author: gtully Date: Thu Oct 16 09:48:52 2008 New Revision: 705281 URL: http://svn.apache.org/viewvc?rev=705281&view=rev Log: AMQ-1976, individual messages passing through a network bridge need an individual ack; for an individual ack, it is ok to have more than one message dispatched Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=705281&r1=705280&r2=705281&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Thu Oct 16 09:48:52 2008 @@ -411,40 +411,44 @@ * @param lastAckedMsg * @throws JMSException if it does not match */ - protected void assertAckMatchesDispatched(MessageAck ack) - throws JMSException { + protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException { MessageId firstAckedMsg = ack.getFirstMessageId(); - MessageId lastAckedMsg = ack.getLastMessageId(); + MessageId lastAckedMsg = ack.getLastMessageId(); + int checkCount = 0; + boolean checkFoundStart = false; + boolean checkFoundEnd = false; + for (MessageReference node : dispatched) { + + if (firstAckedMsg == null) { + checkFoundStart = true; + } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) { + checkFoundStart = true; + } + + if (checkFoundStart) { + checkCount++; + } - int checkCount = 0; - boolean checkFoundStart = false; - boolean checkFoundEnd = false; - for (MessageReference node : dispatched) { - - if( firstAckedMsg == null ) { - checkFoundStart=true; - } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) { - checkFoundStart = true; - } - - if (checkFoundStart) { - checkCount++; - } - - if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) { - checkFoundEnd = true; - break; - } - } - if (!checkFoundStart && firstAckedMsg != null) - throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+firstAckedMsg+" in dispatched-list (start of ack)"); - if (!checkFoundEnd && lastAckedMsg != null) - throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+lastAckedMsg+" in dispatched-list (end of ack)"); - if (ack.getMessageCount() != checkCount) { - throw new JMSException("Unmatched acknowledege: Expected message count ("+ack.getMessageCount()+ - ") differs from count in dispatched-list ("+checkCount+")"); - } - } + if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) { + checkFoundEnd = true; + break; + } + } + if (!checkFoundStart && firstAckedMsg != null) + throw new JMSException("Unmatched acknowledege: " + ack + + "; Could not find Message-ID " + firstAckedMsg + + " in dispatched-list (start of ack)"); + if (!checkFoundEnd && lastAckedMsg != null) + throw new JMSException("Unmatched acknowledege: " + ack + + "; Could not find Message-ID " + lastAckedMsg + + " in dispatched-list (end of ack)"); + if (ack.getMessageCount() != checkCount && ack.isStandardAck()) { + throw new JMSException("Unmatched acknowledege: " + ack + + "; Expected message count (" + ack.getMessageCount() + + ") differs from count in dispatched-list (" + checkCount + + ")"); + } + } /** * @param context Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=705281&r1=705280&r2=705281&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Thu Oct 16 09:48:52 2008 @@ -647,7 +647,7 @@ else{ LOG.info("Message not forwarded on to remote, because message came from remote"); } - localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1)); + localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); dequeueCounter.incrementAndGet(); } else { @@ -664,7 +664,7 @@ ExceptionResponse er = (ExceptionResponse)response; serviceLocalException(er.getException()); } else { - localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1)); + localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); dequeueCounter.incrementAndGet(); } } catch (IOException e) { Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java?rev=705281&r1=705280&r2=705281&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java Thu Oct 16 09:48:52 2008 @@ -39,7 +39,6 @@ public static final int MESSAGE_COUNT = 2000; public void testQueueNetworkWithConsumerFull() throws Exception { - if (true) return; bridgeAllBrokers(); startAllBrokers(); @@ -67,8 +66,6 @@ assertTrue("All messages are consumed and acked from source:" + internalQueue, internalQueue.getMessages().isEmpty()); assertEquals("messages source:" + internalQueue, 0, internalQueue.getDestinationStatistics().getMessages().getCount()); assertEquals("inflight source:" + internalQueue, 0, internalQueue.getDestinationStatistics().getInflight().getCount()); - - } public void setUp() throws Exception { @@ -82,7 +79,6 @@ } BrokerService broker2 = brokers.get("Broker2").broker; applyMemoryLimitPolicy(broker2); - } private void applyMemoryLimitPolicy(BrokerService broker) {