Author: gtully Date: Wed Dec 3 09:44:39 2008 New Revision: 722983 URL: http://svn.apache.org/viewvc?rev=722983&view=rev Log: resolve AMQ-2020, we may want to push setBatch into the MessageStore inteface, see the use by the cursors when the cache is exhausted Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=722983&r1=722982&r2=722983&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Wed Dec 3 09:44:39 2008 @@ -863,6 +863,9 @@ QueueMessageReference r = createMessageReference(m); BrokerSupport.resend(context, m, dest); removeMessage(context, r); + synchronized (messages) { + messages.rollback(r.getMessageId()); + } return true; } @@ -909,18 +912,12 @@ IndirectMessageReference r = (IndirectMessageReference) ref; if (filter.evaluate(context, r)) { // We should only move messages that can be locked. - Message m = r.getMessage(); - BrokerSupport.resend(context, m, dest); - removeMessage(context, r); + moveMessageTo(context, ref.getMessage(), dest); set.remove(r); if (++movedCounter >= maximumMessages && maximumMessages > 0) { return movedCounter; } - } else { - synchronized (messages) { - messages.rollback(r.getMessageId()); - } } } } while (set.size() < this.destinationStatistics.getMessages().getCount() @@ -1088,6 +1085,12 @@ }); } } + if (ack.isPoisonAck()) { + // message gone to DLQ, is ok to allow redelivery + synchronized(messages) { + messages.rollback(reference.getMessageId()); + } + } } @@ -1097,9 +1100,6 @@ synchronized(pagedInMessages) { pagedInMessages.remove(reference.getMessageId()); } - synchronized(messages) { - messages.rollback(reference.getMessageId()); - } } public void messageExpired(ConnectionContext context,MessageReference reference) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=722983&r1=722982&r2=722983&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Wed Dec 3 09:44:39 2008 @@ -43,6 +43,7 @@ protected boolean batchResetNeeded = true; protected boolean storeHasMessages = false; protected int size; + private MessageId lastCachedId; protected AbstractStoreCursor(Destination destination) { this.regionDestination=destination; @@ -154,12 +155,20 @@ public final synchronized void addMessageLast(MessageReference node) throws Exception { if (cacheEnabled && hasSpace()) { recoverMessage(node.getMessage(),true); - }else { + lastCachedId = node.getMessageId(); + } else { + if (cacheEnabled) { + // sync with store on disabling the cache + setBatch(lastCachedId); + } cacheEnabled=false; } size++; } + protected void setBatch(MessageId messageId) { + } + public final synchronized void addMessageFirst(MessageReference node) throws Exception { cacheEnabled=false; size++; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=722983&r1=722982&r2=722983&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Wed Dec 3 09:44:39 2008 @@ -17,11 +17,14 @@ package org.apache.activemq.broker.region.cursors; import java.io.IOException; +import java.io.InterruptedIOException; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.amq.AMQMessageStore; +import org.apache.activemq.store.kahadaptor.KahaReferenceStore; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -71,6 +74,20 @@ this.store.resetBatching(); } + protected void setBatch(MessageId messageId) { + AMQMessageStore amqStore = (AMQMessageStore) store; + try { + amqStore.flush(); + } catch (InterruptedIOException e) { + LOG.debug("flush on setBatch resulted in exception", e); + } + KahaReferenceStore kahaStore = + (KahaReferenceStore) amqStore.getReferenceStore(); + kahaStore.setBatch(messageId); + batchResetNeeded = false; + } + + protected void doFillBatch() throws Exception { this.store.recoverNextMessages(this.maxBatchSize, this); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=722983&r1=722982&r2=722983&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Wed Dec 3 09:44:39 2008 @@ -17,9 +17,7 @@ package org.apache.activemq.broker.region.cursors; import java.io.IOException; -import java.util.LinkedHashMap; -import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.Message; @@ -39,7 +37,6 @@ class TopicStorePrefetch extends AbstractStoreCursor { private static final Log LOG = LogFactory.getLog(TopicStorePrefetch.class); private TopicMessageStore store; - private final LinkedHashMap batchList = new LinkedHashMap (); private String clientId; private String subscriberName; private Subscription subscription; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=722983&r1=722982&r2=722983&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Wed Dec 3 09:44:39 2008 @@ -193,7 +193,7 @@ public void removeAllMessages(ConnectionContext context) throws IOException { lock.lock(); try { - Set tmpSet = new HashSet(messageContainer.keySet()); + Set tmpSet = new HashSet(messageContainer.keySet()); for (MessageId id:tmpSet) { removeMessage(id); } @@ -255,5 +255,11 @@ * @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId) */ public void setBatch(MessageId startAfter) { + lock.lock(); + try { + batchEntry = messageContainer.getEntry(startAfter); + } finally { + lock.unlock(); + } } } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java?rev=722983&r1=722982&r2=722983&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java Wed Dec 3 09:44:39 2008 @@ -29,6 +29,7 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.command.ActiveMQDestination; public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest { @@ -89,15 +90,13 @@ @Override public void testLoadRequestReply() throws Exception { super.testLoadRequestReply(); + + Thread.sleep(2000); // some checks on the slave AdvisoryBroker ab = (AdvisoryBroker) slave.getBroker().getAdaptor( AdvisoryBroker.class); - if (!deleteTempQueue || serverTransactional) { - // give temp destination removes a chance to perculate on connection.close - Thread.sleep(2000); - } assertEquals("the temp queues should not be visible as they are removed", 1, ab.getAdvisoryDestinations().size()); RegionBroker rb = (RegionBroker) slave.getBroker().getAdaptor( Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=722983&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java Wed Dec 3 09:44:39 2008 @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.region; + +import java.io.IOException; +import java.util.List; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.InvalidSelectorException; +import javax.management.ObjectName; + +import junit.framework.TestCase; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.Response; +import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.state.ProducerState; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.amq.AMQPersistenceAdapter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * @author gtully + * @see https://issues.apache.org/activemq/browse/AMQ-2020 + **/ +public class QueueDuplicatesFromStoreTest extends TestCase { + private static final Log LOG = LogFactory + .getLog(QueueDuplicatesFromStoreTest.class); + + ActiveMQQueue destination = new ActiveMQQueue("queue-" + + QueueDuplicatesFromStoreTest.class.getSimpleName()); + BrokerService brokerService; + + final static String mesageIdRoot = "11111:22222:"; + final int messageBytesSize = 256; + final String text = new String(new byte[messageBytesSize]); + + final int ackStartIndex = 100; + final int ackWindow = 50; + final int ackBatchSize = 50; + final int fullWindow = 200; + final int count = 20000; + + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setUseJmx(false); + brokerService.deleteAllMessages(); + brokerService.start(); + } + + public void tearDown() throws Exception { + brokerService.stop(); + } + + public void testNoDuplicateAfterCacheFullAndAckedWithLargeAuditDepth() throws Exception { + doTestNoDuplicateAfterCacheFullAndAcked(1024*10); + } + + public void testNoDuplicateAfterCacheFullAndAckedWithSmallAuditDepth() throws Exception { + doTestNoDuplicateAfterCacheFullAndAcked(512); + } + + public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws Exception { + final AMQPersistenceAdapter persistenceAdapter = + (AMQPersistenceAdapter) brokerService.getPersistenceAdapter(); + final MessageStore queueMessageStore = + persistenceAdapter.createQueueMessageStore(destination); + final ConnectionContext contextNotInTx = new ConnectionContext(); + final ConsumerInfo consumerInfo = new ConsumerInfo(); + final DestinationStatistics destinationStatistics = new DestinationStatistics(); + consumerInfo.setExclusive(true); + final Queue queue = new Queue(brokerService, destination, + queueMessageStore, destinationStatistics, null); + + // a workaround for this issue + // queue.setUseCache(false); + queue.systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 10); + queue.setMaxAuditDepth(auditDepth); + queue.initialize(); + queue.start(); + + + ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); + ProducerInfo producerInfo = new ProducerInfo(); + ProducerState producerState = new ProducerState(producerInfo); + producerExchange.setProducerState(producerState); + producerExchange.setConnectionContext(contextNotInTx); + + final CountDownLatch receivedLatch = new CountDownLatch(count); + final AtomicLong ackedCount = new AtomicLong(0); + final AtomicLong enqueueCounter = new AtomicLong(0); + final Vector errors = new Vector(); + + // populate the queue store, exceed memory limit so that cache is disabled + for (int i = 0; i < count; i++) { + Message message = getMessage(i); + queue.send(producerExchange, message); + } + + assertEquals("store count is correct", count, queueMessageStore + .getMessageCount()); + + // pull from store in small windows + Subscription subscription = new Subscription() { + + public void add(MessageReference node) throws Exception { + if (enqueueCounter.get() != node.getMessageId().getProducerSequenceId()) { + errors.add("Not in sequence at: " + enqueueCounter.get() + ", received: " + + node.getMessageId().getProducerSequenceId()); + } + assertEquals("is in order", enqueueCounter.get(), node + .getMessageId().getProducerSequenceId()); + receivedLatch.countDown(); + enqueueCounter.incrementAndGet(); + node.decrementReferenceCount(); + } + + public void add(ConnectionContext context, Destination destination) + throws Exception { + } + + public int countBeforeFull() { + if (isFull()) { + return 0; + } else { + return fullWindow - (int) (enqueueCounter.get() - ackedCount.get()); + } + } + + public void destroy() { + }; + + public void gc() { + } + + public ConsumerInfo getConsumerInfo() { + return consumerInfo; + } + + public ConnectionContext getContext() { + return null; + } + + public long getDequeueCounter() { + return 0; + } + + public long getDispatchedCounter() { + return 0; + } + + public int getDispatchedQueueSize() { + return 0; + } + + public long getEnqueueCounter() { + return 0; + } + + public int getInFlightSize() { + return 0; + } + + public int getInFlightUsage() { + return 0; + } + + public ObjectName getObjectName() { + return null; + } + + public int getPendingQueueSize() { + return 0; + } + + public int getPrefetchSize() { + return 0; + } + + public String getSelector() { + return null; + } + + public boolean isBrowser() { + return false; + } + + public boolean isFull() { + return (enqueueCounter.get() - ackedCount.get()) >= fullWindow; + } + + public boolean isHighWaterMark() { + return false; + } + + public boolean isLowWaterMark() { + return false; + } + + public boolean isRecoveryRequired() { + return false; + } + + public boolean isSlave() { + return false; + } + + public boolean matches(MessageReference node, + MessageEvaluationContext context) throws IOException { + return true; + } + + public boolean matches(ActiveMQDestination destination) { + return true; + } + + public void processMessageDispatchNotification( + MessageDispatchNotification mdn) throws Exception { + } + + public Response pullMessage(ConnectionContext context, + MessagePull pull) throws Exception { + return null; + } + + public List remove(ConnectionContext context, + Destination destination) throws Exception { + return null; + } + + public void setObjectName(ObjectName objectName) { + } + + public void setSelector(String selector) + throws InvalidSelectorException, + UnsupportedOperationException { + } + + public void updateConsumerPrefetch(int newPrefetch) { + } + + public boolean addRecoveredMessage(ConnectionContext context, + MessageReference message) throws Exception { + return false; + } + + public ActiveMQDestination getActiveMQDestination() { + return destination; + } + + public void acknowledge(ConnectionContext context, MessageAck ack) + throws Exception { + } + }; + + queue.addSubscription(contextNotInTx, subscription); + int removeIndex = 0; + do { + // Simulate periodic acks in small but recent windows + long receivedCount = enqueueCounter.get(); + if (receivedCount > ackStartIndex) { + if (receivedCount >= removeIndex + ackWindow) { + for (int j = 0; j < ackBatchSize; j++, removeIndex++) { + ackedCount.incrementAndGet(); + MessageAck ack = new MessageAck(); + ack.setLastMessageId(new MessageId(mesageIdRoot + + removeIndex)); + ack.setMessageCount(1); + queue.removeMessage(contextNotInTx, subscription, + new IndirectMessageReference( + getMessage(removeIndex)), ack); + + } + if (removeIndex % 1000 == 0) { + LOG.info("acked: " + removeIndex); + persistenceAdapter.checkpoint(true); + persistenceAdapter.cleanup(); + } + } + } + + } while (!receivedLatch.await(0, TimeUnit.MILLISECONDS) && errors.isEmpty()); + + assertTrue("There are no errors: " + errors, errors.isEmpty()); + assertEquals(count, enqueueCounter.get()); + assertEquals("store count is correct", count - removeIndex, + queueMessageStore.getMessageCount()); + } + + private Message getMessage(int i) throws Exception { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setMessageId(new MessageId(mesageIdRoot + i)); + message.setDestination(destination); + message.setPersistent(true); + message.setResponseRequired(true); + message.setText("Msg:" + i + " " + text); + assertEquals(message.getMessageId().getProducerSequenceId(), i); + return message; + } +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date