Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 18424 invoked from network); 6 Feb 2008 12:24:24 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 6 Feb 2008 12:24:24 -0000 Received: (qmail 9715 invoked by uid 500); 6 Feb 2008 12:24:17 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 9649 invoked by uid 500); 6 Feb 2008 12:24:16 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 9640 invoked by uid 99); 6 Feb 2008 12:24:16 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Feb 2008 04:24:16 -0800 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Feb 2008 12:24:06 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 7832F1A9832; Wed, 6 Feb 2008 04:23:56 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r618981 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/region/ broker/region/cursors/ store/ store/amq/ store/kahadaptor/ store/memory/ Date: Wed, 06 Feb 2008 12:23:50 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080206122356.7832F1A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Wed Feb 6 04:23:41 2008 New Revision: 618981 URL: http://svn.apache.org/viewvc?rev=618981&view=rev Log: Reduce contention on the AMQ Store Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=618981&r1=618980&r2=618981&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Wed Feb 6 04:23:41 2008 @@ -277,4 +277,8 @@ } } } + + protected boolean isDropped(MessageReference node) { + return false; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=618981&r1=618980&r2=618981&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Feb 6 04:23:41 2008 @@ -452,7 +452,10 @@ if (node == null) { break; } - if (canDispatch(node)) { + if(isDropped(node)) { + pending.remove(); + } + else if (canDispatch(node)) { pending.remove(); // Message may have been sitting in the pending // list a while waiting for the consumer to ak the message. @@ -574,6 +577,8 @@ * @throws IOException */ protected abstract boolean canDispatch(MessageReference node) throws IOException; + + protected abstract boolean isDropped(MessageReference node); /** * Used during acknowledgment to remove the message. Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=618981&r1=618980&r2=618981&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Wed Feb 6 04:23:41 2008 @@ -205,4 +205,14 @@ public void destroy() { } + + protected boolean isDropped(MessageReference node) { + boolean result = false; + if(node instanceof IndirectMessageReference) { + QueueMessageReference qmr = (QueueMessageReference) node; + result = qmr.isDropped(); + } + return result; + } + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=618981&r1=618980&r2=618981&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Wed Feb 6 04:23:41 2008 @@ -38,6 +38,7 @@ */ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener, UsageListener { private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class); + protected static final int MAX_FILL_ATTEMPTS=3; protected final Destination regionDestination; protected final LinkedHashMap batchList = new LinkedHashMap (); protected boolean cacheEnabled=false; @@ -180,7 +181,10 @@ resetBatch(); this.batchResetNeeded = false; } - while (this.batchList.isEmpty() && (this.storeHasMessages || size > 0)) { + //we may have to move the store cursor past messages that have + //already been delivered - but we also don't want it to spin + int fillAttempts=0; + while (fillAttempts < MAX_FILL_ATTEMPTS && this.batchList.isEmpty() && (this.storeHasMessages ||this.size >0)) { this.storeHasMessages = false; try { doFillBatch(); @@ -191,6 +195,7 @@ if (!this.batchList.isEmpty()) { this.storeHasMessages=true; } + fillAttempts++; } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java?rev=618981&r1=618980&r2=618981&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java Wed Feb 6 04:23:41 2008 @@ -18,6 +18,7 @@ package org.apache.activemq.store; import java.io.IOException; +import java.util.concurrent.locks.Lock; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.MessageId; @@ -82,5 +83,7 @@ boolean supportsExternalBatchControl(); void setBatch(MessageId startAfter); - + + Lock getStoreLock(); + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=618981&r1=618980&r2=618981&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Wed Feb 6 04:23:41 2008 @@ -29,6 +29,7 @@ import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; @@ -78,9 +79,11 @@ private Map cpAddedMessageIds; private final boolean debug = LOG.isDebugEnabled(); private final AtomicReference mark = new AtomicReference(); + protected final Lock lock; - public AMQMessageStore(AMQPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) { + public AMQMessageStore(AMQPersistenceAdapter adapter,ReferenceStore referenceStore, ActiveMQDestination destination) { this.peristenceAdapter = adapter; + this.lock=referenceStore.getStoreLock(); this.transactionStore = adapter.getTransactionStore(); this.referenceStore = referenceStore; this.destination = destination; @@ -99,7 +102,7 @@ } /** - * Not synchronized since the Journal has better throughput if you increase + * Not synchronize since the Journal has better throughput if you increase * the number of concurrent writes that it is doing. */ public final void addMessage(ConnectionContext context, final Message message) throws IOException { @@ -114,8 +117,11 @@ if (debug) { LOG.debug("Journalled transacted message add for: " + id + ", at: " + location); } - synchronized (this) { + lock.lock(); + try { inFlightTxLocations.add(location); + }finally { + lock.unlock(); } transactionStore.addMessage(this, message, location); context.getTransaction().addSynchronization(new Synchronization() { @@ -124,8 +130,11 @@ if (debug) { LOG.debug("Transacted message add commit for: " + id + ", at: " + location); } - synchronized (AMQMessageStore.this) { + lock.lock(); + try { inFlightTxLocations.remove(location); + }finally { + lock.unlock(); } addMessage(message, location); } @@ -134,8 +143,11 @@ if (debug) { LOG.debug("Transacted message add rollback for: " + id + ", at: " + location); } - synchronized (AMQMessageStore.this) { + lock.lock(); + try { inFlightTxLocations.remove(location); + }finally { + lock.unlock(); } } }); @@ -147,10 +159,13 @@ data.setExpiration(message.getExpiration()); data.setFileId(location.getDataFileId()); data.setOffset(location.getOffset()); - synchronized (this) { + lock.lock(); + try { lastLocation = location; messages.put(message.getMessageId(), data); this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId()); + }finally { + lock.unlock(); } if (messages.size() > this.peristenceAdapter .getMaxCheckpointMessageAddSize()) { @@ -199,8 +214,11 @@ if (debug) { LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location); } - synchronized (this) { + lock.lock(); + try { inFlightTxLocations.add(location); + }finally { + lock.unlock(); } transactionStore.removeMessage(this, ack, location); context.getTransaction().addSynchronization(new Synchronization() { @@ -209,9 +227,12 @@ if (debug) { LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location); } - synchronized (AMQMessageStore.this) { + lock.lock(); + try { inFlightTxLocations.remove(location); removeMessage(ack,location); + }finally { + lock.unlock(); } } @@ -219,8 +240,11 @@ if (debug) { LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location); } - synchronized (AMQMessageStore.this) { + lock.lock(); + try { inFlightTxLocations.remove(location); + }finally { + lock.unlock(); } } }); @@ -229,13 +253,16 @@ final void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException { ReferenceData data; - synchronized (this) { + lock.lock(); + try{ lastLocation = location; MessageId id = ack.getLastMessageId(); data = messages.remove(id); if (data == null) { messageAcks.add(ack); } + }finally { + lock.unlock(); } if (messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) { flush(); @@ -273,7 +300,8 @@ LOG.debug("flush starting ..."); } CountDownLatch countDown; - synchronized (this) { + lock.lock(); + try { if (lastWrittenLocation == lastLocation) { return; } @@ -281,6 +309,8 @@ flushLatch = new CountDownLatch(1); } countDown = flushLatch; + }finally { + lock.unlock(); } try { asyncWriteTask.wakeup(); @@ -300,9 +330,12 @@ void asyncWrite() { try { CountDownLatch countDown; - synchronized (this) { + lock.lock(); + try { countDown = flushLatch; flushLatch = null; + }finally { + lock.unlock(); } mark.set(doAsyncWrite()); if (countDown != null) { @@ -323,13 +356,16 @@ final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize(); final Location lastLocation; // swap out the message hash maps.. - synchronized (this) { + lock.lock(); + try { cpAddedMessageIds = this.messages; cpRemovedMessageLocations = this.messageAcks; cpActiveJournalLocations = new ArrayList(inFlightTxLocations); this.messages = new LinkedHashMap(); this.messageAcks = new ArrayList(); lastLocation = this.lastLocation; + }finally { + lock.unlock(); } if (LOG.isDebugEnabled()) { LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: " + cpRemovedMessageLocations.size() + " "); @@ -371,9 +407,12 @@ } }); LOG.debug("Batch update done."); - synchronized (this) { + lock.lock(); + try { cpAddedMessageIds = null; lastWrittenLocation = lastLocation; + }finally { + lock.unlock(); } if (cpActiveJournalLocations.size() > 0) { Collections.sort(cpActiveJournalLocations); @@ -403,12 +442,15 @@ protected Location getLocation(MessageId messageId) throws IOException { ReferenceData data = null; - synchronized (this) { + lock.lock(); + try { // Is it still in flight??? data = messages.get(messageId); if (data == null && cpAddedMessageIds != null) { data = cpAddedMessageIds.get(messageId); } + }finally { + lock.unlock(); } if (data == null) { data = referenceStore.getMessageReference(messageId); @@ -483,11 +525,11 @@ } public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { - /* RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter( this, listener); if (referenceStore.supportsExternalBatchControl()) { - synchronized (this) { + lock.lock(); + try { referenceStore.recoverNextMessages(maxReturned, recoveryListener); if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) { @@ -505,18 +547,21 @@ referenceStore.setBatch(recoveryListener .getLastRecoveredMessageId()); } + }finally { + lock.unlock(); } } else { flush(); referenceStore.recoverNextMessages(maxReturned, recoveryListener); } - */ + /* RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener); referenceStore.recoverNextMessages(maxReturned, recoveryListener); if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) { flush(); referenceStore.recoverNextMessages(maxReturned, recoveryListener); } + */ } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=618981&r1=618980&r2=618981&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Wed Feb 6 04:23:41 2008 @@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import org.apache.activeio.journal.Journal; import org.apache.activemq.broker.BrokerService; @@ -430,7 +431,7 @@ AMQTopicMessageStore store = (AMQTopicMessageStore)topics.get(destinationName); if (store == null) { TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName); - store = new AMQTopicMessageStore(this, checkpointStore, destinationName); + store = new AMQTopicMessageStore(this,checkpointStore, destinationName); try { store.start(); } catch (Exception e) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?rev=618981&r1=618980&r2=618981&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Wed Feb 6 04:23:41 2008 @@ -17,10 +17,6 @@ package org.apache.activemq.store.amq; import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQTopic; @@ -33,7 +29,6 @@ import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicReferenceStore; import org.apache.activemq.transaction.Synchronization; -import org.apache.activemq.util.Callback; import org.apache.activemq.util.SubscriptionKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,7 +42,7 @@ private static final Log LOG = LogFactory.getLog(AMQTopicMessageStore.class); private TopicReferenceStore topicReferenceStore; - public AMQTopicMessageStore(AMQPersistenceAdapter adapter, TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) { + public AMQTopicMessageStore(AMQPersistenceAdapter adapter,TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) { super(adapter, topicReferenceStore, destinationName); this.topicReferenceStore = topicReferenceStore; } @@ -98,8 +93,11 @@ if (debug) { LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location); } - synchronized (this) { + lock.lock(); + try { inFlightTxLocations.add(location); + }finally { + lock.unlock(); } transactionStore.acknowledge(this, ack, location); context.getTransaction().addSynchronization(new Synchronization() { @@ -108,9 +106,12 @@ if (debug) { LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location); } - synchronized (AMQTopicMessageStore.this) { + lock.lock(); + try { inFlightTxLocations.remove(location); acknowledge(context,messageId, location, clientId,subscriptionName); + }finally { + lock.unlock(); } } @@ -118,8 +119,11 @@ if (debug) { LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location); } - synchronized (AMQTopicMessageStore.this) { + lock.lock(); + try{ inFlightTxLocations.remove(location); + }finally { + lock.unlock(); } } }); @@ -149,8 +153,12 @@ Location location, String clientId, String subscriptionName) throws IOException { MessageAck ack = null; - synchronized (this) { + lock.lock(); + try { lastLocation = location; + }finally { + lock.unlock(); + } if (topicReferenceStore.acknowledgeReference(context, clientId, subscriptionName, messageId)) { @@ -158,7 +166,7 @@ ack.setLastMessageId(messageId); } - } + if (ack != null) { removeMessage(context, ack); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java?rev=618981&r1=618980&r2=618981&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java Wed Feb 6 04:23:41 2008 @@ -55,9 +55,7 @@ if (message != null) { return recoverMessage(message); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Message id " + ref + " could not be recovered from the data store - already dispatched"); - } + LOG.error("Message id " + ref + " could not be recovered from the data store - already dispatched"); } return false; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=618981&r1=618980&r2=618981&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Wed Feb 6 04:23:41 2008 @@ -17,6 +17,9 @@ package org.apache.activemq.store.kahadaptor; import java.io.IOException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -26,10 +29,12 @@ import org.apache.activemq.kaha.StoreEntry; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.ReferenceStore; -import org.apache.activemq.store.ReferenceStore.ReferenceData; import org.apache.activemq.usage.MemoryUsage; -import org.apache.activemq.usage.SystemUsage; +/** + * @author rajdavies + * + */ public class KahaReferenceStore implements ReferenceStore { protected final ActiveMQDestination destination; @@ -37,6 +42,7 @@ protected KahaReferenceStoreAdapter adapter; private StoreEntry batchEntry; private String lastBatchId; + protected final Lock lock = new ReentrantLock(); public KahaReferenceStore(KahaReferenceStoreAdapter adapter, MapContainer container, ActiveMQDestination destination) throws IOException { @@ -44,6 +50,10 @@ this.messageContainer = container; this.destination = destination; } + + public Lock getStoreLock() { + return lock; + } public void start() { } @@ -55,11 +65,11 @@ return new MessageId(((ReferenceRecord)object).getMessageId()); } - public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { + public void addMessage(ConnectionContext context, Message message) throws IOException { throw new RuntimeException("Use addMessageReference instead"); } - public synchronized Message getMessage(MessageId identity) throws IOException { + public Message getMessage(MessageId identity) throws IOException { throw new RuntimeException("Use addMessageReference instead"); } @@ -73,58 +83,78 @@ return false; } - public synchronized void recover(MessageRecoveryListener listener) throws Exception { - for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer - .getNext(entry)) { - ReferenceRecord record = messageContainer.getValue(entry); - if (!recoverReference(listener, record)) { - break; + public void recover(MessageRecoveryListener listener) throws Exception { + lock.lock(); + try { + for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer + .getNext(entry)) { + ReferenceRecord record = messageContainer.getValue(entry); + if (!recoverReference(listener, record)) { + break; + } } + }finally { + lock.unlock(); } } - public synchronized void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) + public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { - StoreEntry entry = batchEntry; - if (entry == null) { - entry = messageContainer.getFirst(); - } else { - entry = messageContainer.refresh(entry); - if (entry != null) { - entry = messageContainer.getNext(entry); + lock.lock(); + try { + StoreEntry entry = batchEntry; + if (entry == null) { + entry = messageContainer.getFirst(); + } else { + entry = messageContainer.refresh(entry); + if (entry != null) { + entry = messageContainer.getNext(entry); + } } - } - if (entry != null) { - int count = 0; - do { - ReferenceRecord msg = messageContainer.getValue(entry); - if (msg != null ) { - if ( recoverReference(listener, msg)) { - count++; - lastBatchId = msg.getMessageId(); + if (entry != null) { + int count = 0; + do { + ReferenceRecord msg = messageContainer.getValue(entry); + if (msg != null ) { + if ( recoverReference(listener, msg)) { + count++; + lastBatchId = msg.getMessageId(); + } + } else { + lastBatchId = null; } - } else { - lastBatchId = null; - } - batchEntry = entry; - entry = messageContainer.getNext(entry); - } while (entry != null && count < maxReturned && listener.hasSpace()); + batchEntry = entry; + entry = messageContainer.getNext(entry); + } while (entry != null && count < maxReturned && listener.hasSpace()); + } + }finally { + lock.unlock(); } } - public synchronized void addMessageReference(ConnectionContext context, MessageId messageId, + public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException { - ReferenceRecord record = new ReferenceRecord(messageId.toString(), data); - messageContainer.put(messageId, record); - addInterest(record); + lock.lock(); + try { + ReferenceRecord record = new ReferenceRecord(messageId.toString(), data); + messageContainer.put(messageId, record); + addInterest(record); + }finally { + lock.unlock(); + } } - public synchronized ReferenceData getMessageReference(MessageId identity) throws IOException { - ReferenceRecord result = messageContainer.get(identity); - if (result == null) { - return null; + public ReferenceData getMessageReference(MessageId identity) throws IOException { + lock.lock(); + try { + ReferenceRecord result = messageContainer.get(identity); + if (result == null) { + return null; + } + return result.getData(); + }finally { + lock.unlock(); } - return result.getData(); } public void addReferenceFileIdsInUse() { @@ -139,36 +169,57 @@ removeMessage(ack.getLastMessageId()); } - public synchronized void removeMessage(MessageId msgId) throws IOException { - StoreEntry entry = messageContainer.getEntry(msgId); - if (entry != null) { - ReferenceRecord rr = messageContainer.remove(msgId); - if (rr != null) { - removeInterest(rr); - if (messageContainer.isEmpty() - || (lastBatchId != null && lastBatchId.equals(msgId.toString())) - || (batchEntry != null && batchEntry.equals(entry))) { - resetBatching(); + public void removeMessage(MessageId msgId) throws IOException { + lock.lock(); + try { + StoreEntry entry = messageContainer.getEntry(msgId); + if (entry != null) { + ReferenceRecord rr = messageContainer.remove(msgId); + if (rr != null) { + removeInterest(rr); + if (messageContainer.isEmpty() + || (lastBatchId != null && lastBatchId.equals(msgId.toString())) + || (batchEntry != null && batchEntry.equals(entry))) { + resetBatching(); + } } } + }finally { + lock.unlock(); } } - public synchronized void removeAllMessages(ConnectionContext context) throws IOException { - messageContainer.clear(); + public void removeAllMessages(ConnectionContext context) throws IOException { + lock.lock(); + try { + messageContainer.clear(); + }finally { + lock.unlock(); + } + } public ActiveMQDestination getDestination() { return destination; } - public synchronized void delete() { - messageContainer.clear(); + public void delete() { + lock.lock(); + try { + messageContainer.clear(); + }finally { + lock.unlock(); + } } - public synchronized void resetBatching() { - batchEntry = null; - lastBatchId = null; + public void resetBatching() { + lock.lock(); + try { + batchEntry = null; + lastBatchId = null; + }finally { + lock.unlock(); + } } public int getMessageCount() { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=618981&r1=618980&r2=618981&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Wed Feb 6 04:23:41 2008 @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; @@ -177,7 +178,7 @@ } return rc; } - +/* public void buildReferenceFileIdsInUse() throws IOException { recordReferences = new HashMap(); Set destinations = getDestinations(); @@ -191,6 +192,7 @@ } } } + */ protected MapContainer getMapReferenceContainer(Object id, String containerName) @@ -249,6 +251,7 @@ * @throws IOException * @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState() */ + public void recoverState() throws IOException { for (Iterator i = durableSubscribers.iterator(); i.hasNext();) { SubscriptionInfo info = i.next(); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=618981&r1=618980&r2=618981&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Wed Feb 6 04:23:41 2008 @@ -20,6 +20,7 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; @@ -62,33 +63,38 @@ return new MessageId(((ReferenceRecord)object).getMessageId()); } - public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { + public void addMessage(ConnectionContext context, Message message) throws IOException { throw new RuntimeException("Use addMessageReference instead"); } - public synchronized Message getMessage(MessageId identity) throws IOException { + public Message getMessage(MessageId identity) throws IOException { throw new RuntimeException("Use addMessageReference instead"); } - public synchronized void addMessageReference(final ConnectionContext context, final MessageId messageId, + public void addMessageReference(final ConnectionContext context, final MessageId messageId, final ReferenceData data) { - final ReferenceRecord record = new ReferenceRecord(messageId.toString(), data); - final int subscriberCount = subscriberMessages.size(); - if (subscriberCount > 0) { - final StoreEntry messageEntry = messageContainer.place(messageId, record); - addInterest(record); - final TopicSubAck tsa = new TopicSubAck(); - tsa.setCount(subscriberCount); - tsa.setMessageEntry(messageEntry); - final StoreEntry ackEntry = ackContainer.placeLast(tsa); - for (final Iterator i = subscriberMessages.values().iterator(); i.hasNext();) { - final TopicSubContainer container = i.next(); - final ConsumerMessageRef ref = new ConsumerMessageRef(); - ref.setAckEntry(ackEntry); - ref.setMessageEntry(messageEntry); - ref.setMessageId(messageId); - container.add(ref); + lock.lock(); + try { + final ReferenceRecord record = new ReferenceRecord(messageId.toString(), data); + final int subscriberCount = subscriberMessages.size(); + if (subscriberCount > 0) { + final StoreEntry messageEntry = messageContainer.place(messageId, record); + addInterest(record); + final TopicSubAck tsa = new TopicSubAck(); + tsa.setCount(subscriberCount); + tsa.setMessageEntry(messageEntry); + final StoreEntry ackEntry = ackContainer.placeLast(tsa); + for (final Iterator i = subscriberMessages.values().iterator(); i.hasNext();) { + final TopicSubContainer container = i.next(); + final ConsumerMessageRef ref = new ConsumerMessageRef(); + ref.setAckEntry(ackEntry); + ref.setMessageEntry(messageEntry); + ref.setMessageId(messageId); + container.add(ref); + } } + }finally { + lock.unlock(); } } @@ -121,100 +127,119 @@ return container; } - public synchronized boolean acknowledgeReference(ConnectionContext context, + public boolean acknowledgeReference(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { boolean removeMessage = false; - String key = getSubscriptionKey(clientId, subscriptionName); - - TopicSubContainer container = subscriberMessages.get(key); - if (container != null) { - ConsumerMessageRef ref = null; - if((ref = container.remove(messageId)) != null) { - TopicSubAck tsa = ackContainer.get(ref.getAckEntry()); - if (tsa != null) { - if (tsa.decrementCount() <= 0) { - StoreEntry entry = ref.getAckEntry(); - entry = ackContainer.refresh(entry); - ackContainer.remove(entry); - ReferenceRecord rr = messageContainer.get(messageId); - if (rr != null) { - entry = tsa.getMessageEntry(); - entry = messageContainer.refresh(entry); - messageContainer.remove(entry); - removeInterest(rr); - removeMessage = true; + lock.lock(); + try { + String key = getSubscriptionKey(clientId, subscriptionName); + + TopicSubContainer container = subscriberMessages.get(key); + if (container != null) { + ConsumerMessageRef ref = null; + if((ref = container.remove(messageId)) != null) { + TopicSubAck tsa = ackContainer.get(ref.getAckEntry()); + if (tsa != null) { + if (tsa.decrementCount() <= 0) { + StoreEntry entry = ref.getAckEntry(); + entry = ackContainer.refresh(entry); + ackContainer.remove(entry); + ReferenceRecord rr = messageContainer.get(messageId); + if (rr != null) { + entry = tsa.getMessageEntry(); + entry = messageContainer.refresh(entry); + messageContainer.remove(entry); + removeInterest(rr); + removeMessage = true; + } } } + }else{ + //no message held + removeMessage = true; } - }else{ - //no message held - removeMessage = true; } + }finally { + lock.unlock(); } return removeMessage; } - public synchronized void acknowledge(ConnectionContext context, + public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { String key = getSubscriptionKey(clientId, subscriptionName); - - TopicSubContainer container = subscriberMessages.get(key); - if (container != null) { - ConsumerMessageRef ref = container.remove(messageId); - if (ref != null) { - TopicSubAck tsa = ackContainer.get(ref.getAckEntry()); - if (tsa != null) { - if (tsa.decrementCount() <= 0) { - StoreEntry entry = ref.getAckEntry(); - entry = ackContainer.refresh(entry); - ackContainer.remove(entry); - ReferenceRecord rr = messageContainer.get(messageId); - if (rr != null) { - entry = tsa.getMessageEntry(); - entry = messageContainer.refresh(entry); - messageContainer.remove(entry); - removeInterest(rr); + lock.lock(); + try { + TopicSubContainer container = subscriberMessages.get(key); + if (container != null) { + ConsumerMessageRef ref = container.remove(messageId); + if (ref != null) { + TopicSubAck tsa = ackContainer.get(ref.getAckEntry()); + if (tsa != null) { + if (tsa.decrementCount() <= 0) { + StoreEntry entry = ref.getAckEntry(); + entry = ackContainer.refresh(entry); + ackContainer.remove(entry); + ReferenceRecord rr = messageContainer.get(messageId); + if (rr != null) { + entry = tsa.getMessageEntry(); + entry = messageContainer.refresh(entry); + messageContainer.remove(entry); + removeInterest(rr); + } + } else { + + ackContainer.update(ref.getAckEntry(), tsa); } - } else { - - ackContainer.update(ref.getAckEntry(), tsa); } } } - } + }finally { + lock.unlock(); + } } - public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException { + public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException { String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName()); - // if already exists - won't add it again as it causes data files - // to hang around - if (!subscriberContainer.containsKey(key)) { - subscriberContainer.put(key, info); - adapter.addSubscriberState(info); - } - // add the subscriber - addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName()); - if (retroactive) { - /* - * for(StoreEntry - * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){ - * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry); - * ConsumerMessageRef ref=new ConsumerMessageRef(); - * ref.setAckEntry(entry); - * ref.setMessageEntry(tsa.getMessageEntry()); container.add(ref); } - */ + lock.lock(); + try { + // if already exists - won't add it again as it causes data files + // to hang around + if (!subscriberContainer.containsKey(key)) { + subscriberContainer.put(key, info); + adapter.addSubscriberState(info); + } + // add the subscriber + addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName()); + if (retroactive) { + /* + * for(StoreEntry + * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){ + * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry); + * ConsumerMessageRef ref=new ConsumerMessageRef(); + * ref.setAckEntry(entry); + * ref.setMessageEntry(tsa.getMessageEntry()); container.add(ref); } + */ + } + }finally { + lock.unlock(); } } - public synchronized void deleteSubscription(String clientId, String subscriptionName) throws IOException { - SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); - if (info != null) { - adapter.removeSubscriberState(info); - } + public void deleteSubscription(String clientId, String subscriptionName) throws IOException { + lock.lock(); + try { + SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); + if (info != null) { + adapter.removeSubscriberState(info); + } removeSubscriberMessageContainer(clientId,subscriptionName); + }finally { + lock.unlock(); + } } public SubscriptionInfo[] getAllSubscriptions() throws IOException { @@ -233,41 +258,46 @@ return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName)); } - public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, + public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception { String key = getSubscriptionKey(clientId, subscriptionName); - TopicSubContainer container = subscriberMessages.get(key); - if (container != null) { - int count = 0; - StoreEntry entry = container.getBatchEntry(); - if (entry == null) { - entry = container.getEntry(); - } else { - entry = container.refreshEntry(entry); - if (entry != null) { - entry = container.getNextEntry(entry); + lock.lock(); + try { + TopicSubContainer container = subscriberMessages.get(key); + if (container != null) { + int count = 0; + StoreEntry entry = container.getBatchEntry(); + if (entry == null) { + entry = container.getEntry(); + } else { + entry = container.refreshEntry(entry); + if (entry != null) { + entry = container.getNextEntry(entry); + } } - } - - if (entry != null) { - do { - ConsumerMessageRef consumerRef = container.get(entry); - ReferenceRecord msg = messageContainer.getValue(consumerRef - .getMessageEntry()); - if (msg != null) { - if (recoverReference(listener, msg)) { - count++; - container.setBatchEntry(msg.getMessageId(), entry); + + if (entry != null) { + do { + ConsumerMessageRef consumerRef = container.get(entry); + ReferenceRecord msg = messageContainer.getValue(consumerRef + .getMessageEntry()); + if (msg != null) { + if (recoverReference(listener, msg)) { + count++; + container.setBatchEntry(msg.getMessageId(), entry); + } else { + break; + } } else { - break; + container.reset(); } - } else { - container.reset(); - } - - entry = container.getNextEntry(entry); - } while (entry != null && count < maxReturned && listener.hasSpace()); + + entry = container.getNextEntry(entry); + } while (entry != null && count < maxReturned && listener.hasSpace()); + } } + }finally { + lock.unlock(); } } @@ -288,11 +318,16 @@ } } - public synchronized void resetBatching(String clientId, String subscriptionName) { - String key = getSubscriptionKey(clientId, subscriptionName); - TopicSubContainer topicSubContainer = subscriberMessages.get(key); - if (topicSubContainer != null) { - topicSubContainer.reset(); + public void resetBatching(String clientId, String subscriptionName) { + lock.lock(); + try { + String key = getSubscriptionKey(clientId, subscriptionName); + TopicSubContainer topicSubContainer = subscriberMessages.get(key); + if (topicSubContainer != null) { + topicSubContainer.reset(); + } + }finally { + lock.unlock(); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?rev=618981&r1=618980&r2=618981&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Wed Feb 6 04:23:41 2008 @@ -124,7 +124,7 @@ } public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception { - MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); + MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); if (sub != null) { sub.recoverNextMessages(maxReturned, listener); }