activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r618981 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/region/ broker/region/cursors/ store/ store/amq/ store/kahadaptor/ store/memory/
Date Wed, 06 Feb 2008 12:23:50 GMT
Author: rajdavies
Date: Wed Feb  6 04:23:41 2008
New Revision: 618981

URL: http://svn.apache.org/viewvc?rev=618981&view=rev
Log:
Reduce contention on the AMQ Store

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=618981&r1=618980&r2=618981&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Wed Feb  6 04:23:41 2008
@@ -277,4 +277,8 @@
             }
         }
     }
+    
+    protected boolean isDropped(MessageReference node) {
+       return false;
+     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=618981&r1=618980&r2=618981&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Feb  6 04:23:41 2008
@@ -452,7 +452,10 @@
                             if (node == null) {
                                 break;
                             }
-                            if (canDispatch(node)) {
+                            if(isDropped(node)) {
+                                pending.remove();
+                            }
+                            else if (canDispatch(node)) {
                                 pending.remove();
                                 // Message may have been sitting in the pending
                                 // list a while waiting for the consumer to ak the message.
@@ -574,6 +577,8 @@
      * @throws IOException
      */
     protected abstract boolean canDispatch(MessageReference node) throws IOException;
+    
+    protected abstract boolean isDropped(MessageReference node);
 
     /**
      * Used during acknowledgment to remove the message.

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=618981&r1=618980&r2=618981&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Wed Feb  6 04:23:41 2008
@@ -205,4 +205,14 @@
     public void destroy() {
     }
 
+   
+    protected boolean isDropped(MessageReference node) {
+       boolean result = false;
+       if(node instanceof IndirectMessageReference) {
+           QueueMessageReference qmr = (QueueMessageReference) node;
+           result = qmr.isDropped();
+       }
+       return result;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=618981&r1=618980&r2=618981&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Wed Feb  6 04:23:41 2008
@@ -38,6 +38,7 @@
  */
 public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener, UsageListener {
     private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class);
+    protected static final int MAX_FILL_ATTEMPTS=3;
     protected final Destination regionDestination;
     protected final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
     protected boolean cacheEnabled=false;
@@ -180,7 +181,10 @@
             resetBatch();
             this.batchResetNeeded = false;
         }
-        while (this.batchList.isEmpty() && (this.storeHasMessages || size > 0)) {
+        //we may have to move the store cursor past messages that have 
+        //already been delivered - but we also don't want it to spin
+        int fillAttempts=0;
+        while (fillAttempts < MAX_FILL_ATTEMPTS && this.batchList.isEmpty() && (this.storeHasMessages ||this.size >0)) {
             this.storeHasMessages = false;
             try {
                 doFillBatch();
@@ -191,6 +195,7 @@
             if (!this.batchList.isEmpty()) {
                 this.storeHasMessages=true;
             }
+            fillAttempts++;
         }
     }
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java?rev=618981&r1=618980&r2=618981&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java Wed Feb  6 04:23:41 2008
@@ -18,6 +18,7 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.MessageId;
@@ -82,5 +83,7 @@
     boolean supportsExternalBatchControl();
 
     void setBatch(MessageId startAfter);
-
+    
+    Lock getStoreLock();
+    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=618981&r1=618980&r2=618981&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Wed Feb  6 04:23:41 2008
@@ -29,6 +29,7 @@
 import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -78,9 +79,11 @@
     private Map<MessageId, ReferenceData> cpAddedMessageIds;
     private final boolean debug = LOG.isDebugEnabled();
     private final AtomicReference<Location> mark = new AtomicReference<Location>();
+    protected final Lock lock;
 
-    public AMQMessageStore(AMQPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) {
+    public AMQMessageStore(AMQPersistenceAdapter adapter,ReferenceStore referenceStore, ActiveMQDestination destination) {
         this.peristenceAdapter = adapter;
+        this.lock=referenceStore.getStoreLock();
         this.transactionStore = adapter.getTransactionStore();
         this.referenceStore = referenceStore;
         this.destination = destination;
@@ -99,7 +102,7 @@
     }
 
     /**
-     * Not synchronized since the Journal has better throughput if you increase
+     * Not synchronize since the Journal has better throughput if you increase
      * the number of concurrent writes that it is doing.
      */
     public final void addMessage(ConnectionContext context, final Message message) throws IOException {
@@ -114,8 +117,11 @@
             if (debug) {
                 LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
             }
-            synchronized (this) {
+            lock.lock();
+            try {
                 inFlightTxLocations.add(location);
+            }finally {
+                lock.unlock();
             }
             transactionStore.addMessage(this, message, location);
             context.getTransaction().addSynchronization(new Synchronization() {
@@ -124,8 +130,11 @@
                     if (debug) {
                         LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
                     }
-                    synchronized (AMQMessageStore.this) {
+                    lock.lock();
+                    try {
                         inFlightTxLocations.remove(location);
+                    }finally {
+                        lock.unlock();
                     }
                     addMessage(message, location);
                 }
@@ -134,8 +143,11 @@
                     if (debug) {
                         LOG.debug("Transacted message add rollback for: " + id + ", at: " + location);
                     }
-                    synchronized (AMQMessageStore.this) {
+                    lock.lock();
+                    try {
                         inFlightTxLocations.remove(location);
+                    }finally {
+                        lock.unlock();
                     }
                 }
             });
@@ -147,10 +159,13 @@
         data.setExpiration(message.getExpiration());
         data.setFileId(location.getDataFileId());
         data.setOffset(location.getOffset());
-        synchronized (this) {
+         lock.lock();
+         try {
             lastLocation = location;
             messages.put(message.getMessageId(), data);
             this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId());
+        }finally {
+            lock.unlock();
         }
         if (messages.size() > this.peristenceAdapter
                 .getMaxCheckpointMessageAddSize()) {
@@ -199,8 +214,11 @@
             if (debug) {
                 LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
             }
-            synchronized (this) {
+            lock.lock();
+            try {
                 inFlightTxLocations.add(location);
+            }finally {
+                lock.unlock();
             }
             transactionStore.removeMessage(this, ack, location);
             context.getTransaction().addSynchronization(new Synchronization() {
@@ -209,9 +227,12 @@
                     if (debug) {
                         LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location);
                     }
-                    synchronized (AMQMessageStore.this) {
+                    lock.lock();
+                    try {
                         inFlightTxLocations.remove(location);
                         removeMessage(ack,location);
+                    }finally {
+                        lock.unlock();
                     }
                 }
 
@@ -219,8 +240,11 @@
                     if (debug) {
                         LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location);
                     }
-                    synchronized (AMQMessageStore.this) {
+                    lock.lock();
+                    try {
                         inFlightTxLocations.remove(location);
+                    }finally {
+                        lock.unlock();
                     }
                 }
             });
@@ -229,13 +253,16 @@
 
     final void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException {
         ReferenceData data;
-        synchronized (this) {
+        lock.lock();
+        try{
             lastLocation = location;
             MessageId id = ack.getLastMessageId();
             data = messages.remove(id);
             if (data == null) {
                 messageAcks.add(ack);
             }
+        }finally {
+            lock.unlock();
         }
         if (messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
             flush();
@@ -273,7 +300,8 @@
             LOG.debug("flush starting ...");
         }
         CountDownLatch countDown;
-        synchronized (this) {
+        lock.lock();
+        try {
             if (lastWrittenLocation == lastLocation) {
                 return;
             }
@@ -281,6 +309,8 @@
                 flushLatch = new CountDownLatch(1);
             }
             countDown = flushLatch;
+        }finally {
+            lock.unlock();
         }
         try {
             asyncWriteTask.wakeup();
@@ -300,9 +330,12 @@
     void asyncWrite() {
         try {
             CountDownLatch countDown;
-            synchronized (this) {
+            lock.lock();
+            try {
                 countDown = flushLatch;
                 flushLatch = null;
+            }finally {
+                lock.unlock();
             }
             mark.set(doAsyncWrite());
             if (countDown != null) {
@@ -323,13 +356,16 @@
         final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
         final Location lastLocation;
         // swap out the message hash maps..
-        synchronized (this) {
+        lock.lock();
+        try {
             cpAddedMessageIds = this.messages;
             cpRemovedMessageLocations = this.messageAcks;
             cpActiveJournalLocations = new ArrayList<Location>(inFlightTxLocations);
             this.messages = new LinkedHashMap<MessageId, ReferenceData>();
             this.messageAcks = new ArrayList<MessageAck>();
             lastLocation = this.lastLocation;
+        }finally {
+            lock.unlock();
         }
         if (LOG.isDebugEnabled()) {
             LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: " + cpRemovedMessageLocations.size() + " ");
@@ -371,9 +407,12 @@
             }
         });
         LOG.debug("Batch update done.");
-        synchronized (this) {
+        lock.lock();
+        try {
             cpAddedMessageIds = null;
             lastWrittenLocation = lastLocation;
+        }finally {
+            lock.unlock();
         }
         if (cpActiveJournalLocations.size() > 0) {
             Collections.sort(cpActiveJournalLocations);
@@ -403,12 +442,15 @@
     
     protected Location getLocation(MessageId messageId) throws IOException {
         ReferenceData data = null;
-        synchronized (this) {
+        lock.lock();
+        try {
             // Is it still in flight???
             data = messages.get(messageId);
             if (data == null && cpAddedMessageIds != null) {
                 data = cpAddedMessageIds.get(messageId);
             }
+        }finally {
+            lock.unlock();
         }
         if (data == null) {
             data = referenceStore.getMessageReference(messageId);
@@ -483,11 +525,11 @@
     }
 
     public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
-        /*
           RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(
                 this, listener);
         if (referenceStore.supportsExternalBatchControl()) {
-            synchronized (this) {
+            lock.lock();
+            try {
                 referenceStore.recoverNextMessages(maxReturned,
                         recoveryListener);
                 if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
@@ -505,18 +547,21 @@
                     referenceStore.setBatch(recoveryListener
                             .getLastRecoveredMessageId());
                 }
+            }finally {
+                lock.unlock();
             }
         } else {
             flush();
             referenceStore.recoverNextMessages(maxReturned, recoveryListener);
         }
-        */
+        /*
         RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
         referenceStore.recoverNextMessages(maxReturned, recoveryListener);
         if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
             flush();
             referenceStore.recoverNextMessages(maxReturned, recoveryListener);
         }
+        */
        
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=618981&r1=618980&r2=618981&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Wed Feb  6 04:23:41 2008
@@ -30,6 +30,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.activeio.journal.Journal;
 import org.apache.activemq.broker.BrokerService;
@@ -430,7 +431,7 @@
         AMQTopicMessageStore store = (AMQTopicMessageStore)topics.get(destinationName);
         if (store == null) {
             TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
-            store = new AMQTopicMessageStore(this, checkpointStore, destinationName);
+            store = new AMQTopicMessageStore(this,checkpointStore, destinationName);
             try {
                 store.start();
             } catch (Exception e) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?rev=618981&r1=618980&r2=618981&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Wed Feb  6 04:23:41 2008
@@ -17,10 +17,6 @@
 package org.apache.activemq.store.amq;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -33,7 +29,6 @@
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TopicReferenceStore;
 import org.apache.activemq.transaction.Synchronization;
-import org.apache.activemq.util.Callback;
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,7 +42,7 @@
 
     private static final Log LOG = LogFactory.getLog(AMQTopicMessageStore.class);
     private TopicReferenceStore topicReferenceStore;
-    public AMQTopicMessageStore(AMQPersistenceAdapter adapter, TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) {
+    public AMQTopicMessageStore(AMQPersistenceAdapter adapter,TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) {
         super(adapter, topicReferenceStore, destinationName);
         this.topicReferenceStore = topicReferenceStore;
     }
@@ -98,8 +93,11 @@
             if (debug) {
                 LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
             }
-            synchronized (this) {
+            lock.lock();
+            try {
                 inFlightTxLocations.add(location);
+            }finally {
+                lock.unlock();
             }
             transactionStore.acknowledge(this, ack, location);
             context.getTransaction().addSynchronization(new Synchronization() {
@@ -108,9 +106,12 @@
                     if (debug) {
                         LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
                     }
-                    synchronized (AMQTopicMessageStore.this) {
+                    lock.lock();
+                    try {
                         inFlightTxLocations.remove(location);
                         acknowledge(context,messageId, location, clientId,subscriptionName);
+                    }finally {
+                        lock.unlock();
                     }
                 }
 
@@ -118,8 +119,11 @@
                     if (debug) {
                         LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
                     }
-                    synchronized (AMQTopicMessageStore.this) {
+                    lock.lock();
+                    try{
                         inFlightTxLocations.remove(location);
+                    }finally {
+                        lock.unlock();
                     }
                 }
             });
@@ -149,8 +153,12 @@
             Location location, String clientId, String subscriptionName)
             throws IOException {
         MessageAck ack = null;
-        synchronized (this) {
+        lock.lock();
+        try {
             lastLocation = location;
+        }finally {
+            lock.unlock();
+        }
         
             if (topicReferenceStore.acknowledgeReference(context, clientId,
                     subscriptionName, messageId)) {
@@ -158,7 +166,7 @@
                 ack.setLastMessageId(messageId);
                
             }
-        }
+        
         if (ack != null) {
             removeMessage(context, ack);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java?rev=618981&r1=618980&r2=618981&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java Wed Feb  6 04:23:41 2008
@@ -55,9 +55,7 @@
         if (message != null) {
             return recoverMessage(message);
         } else {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Message id " + ref + " could not be recovered from the data store - already dispatched");
-            }
+            LOG.error("Message id " + ref + " could not be recovered from the data store - already dispatched");
         }
         return false;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=618981&r1=618980&r2=618981&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Wed Feb  6 04:23:41 2008
@@ -17,6 +17,9 @@
 package org.apache.activemq.store.kahadaptor;
 
 import java.io.IOException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -26,10 +29,12 @@
 import org.apache.activemq.kaha.StoreEntry;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.ReferenceStore;
-import org.apache.activemq.store.ReferenceStore.ReferenceData;
 import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.SystemUsage;
 
+/**
+ * @author rajdavies
+ *
+ */
 public class KahaReferenceStore implements ReferenceStore {
 
     protected final ActiveMQDestination destination;
@@ -37,6 +42,7 @@
     protected KahaReferenceStoreAdapter adapter;
     private StoreEntry batchEntry;
     private String lastBatchId;
+    protected final Lock lock = new ReentrantLock();
 
     public KahaReferenceStore(KahaReferenceStoreAdapter adapter, MapContainer<MessageId, ReferenceRecord> container,
                               ActiveMQDestination destination) throws IOException {
@@ -44,6 +50,10 @@
         this.messageContainer = container;
         this.destination = destination;
     }
+    
+    public Lock getStoreLock() {
+        return lock;
+    }
 
     public void start() {
     }
@@ -55,11 +65,11 @@
         return new MessageId(((ReferenceRecord)object).getMessageId());
     }
 
-    public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
+    public void addMessage(ConnectionContext context, Message message) throws IOException {
         throw new RuntimeException("Use addMessageReference instead");
     }
 
-    public synchronized Message getMessage(MessageId identity) throws IOException {
+    public Message getMessage(MessageId identity) throws IOException {
         throw new RuntimeException("Use addMessageReference instead");
     }
 
@@ -73,58 +83,78 @@
         return false;
     }
 
-    public synchronized void recover(MessageRecoveryListener listener) throws Exception {
-        for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
-            .getNext(entry)) {
-            ReferenceRecord record = messageContainer.getValue(entry);
-            if (!recoverReference(listener, record)) {
-                break;
+    public void recover(MessageRecoveryListener listener) throws Exception {
+        lock.lock();
+        try {
+            for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
+                .getNext(entry)) {
+                ReferenceRecord record = messageContainer.getValue(entry);
+                if (!recoverReference(listener, record)) {
+                    break;
+                }
             }
+        }finally {
+            lock.unlock();
         }
     }
 
-    public synchronized void recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
+    public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
         throws Exception {
-        StoreEntry entry = batchEntry;
-        if (entry == null) {
-            entry = messageContainer.getFirst();
-        } else {
-            entry = messageContainer.refresh(entry);
-            if (entry != null) {
-                entry = messageContainer.getNext(entry);
+        lock.lock();
+        try {
+            StoreEntry entry = batchEntry;
+            if (entry == null) {
+                entry = messageContainer.getFirst();
+            } else {
+                entry = messageContainer.refresh(entry);
+                if (entry != null) {
+                    entry = messageContainer.getNext(entry);
+                }
             }
-        }
-        if (entry != null) {      
-            int count = 0;
-            do {
-                ReferenceRecord msg = messageContainer.getValue(entry);
-                if (msg != null ) {
-                    if ( recoverReference(listener, msg)) {
-                        count++;
-                        lastBatchId = msg.getMessageId();
+            if (entry != null) {      
+                int count = 0;
+                do {
+                    ReferenceRecord msg = messageContainer.getValue(entry);
+                    if (msg != null ) {
+                        if ( recoverReference(listener, msg)) {
+                            count++;
+                            lastBatchId = msg.getMessageId();
+                        }
+                    } else {
+                        lastBatchId = null;
                     }
-                } else {
-                    lastBatchId = null;
-                }
-                batchEntry = entry;
-                entry = messageContainer.getNext(entry);
-            } while (entry != null && count < maxReturned && listener.hasSpace());
+                    batchEntry = entry;
+                    entry = messageContainer.getNext(entry);
+                } while (entry != null && count < maxReturned && listener.hasSpace());
+            }
+        }finally {
+            lock.unlock();
         }
     }
 
-    public synchronized void addMessageReference(ConnectionContext context, MessageId messageId,
+    public void addMessageReference(ConnectionContext context, MessageId messageId,
                                                  ReferenceData data) throws IOException {
-        ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
-        messageContainer.put(messageId, record);
-        addInterest(record);
+        lock.lock();
+        try {
+            ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
+            messageContainer.put(messageId, record);
+            addInterest(record);
+        }finally {
+            lock.unlock();
+        }
     }
 
-    public synchronized ReferenceData getMessageReference(MessageId identity) throws IOException {
-        ReferenceRecord result = messageContainer.get(identity);
-        if (result == null) {
-            return null;
+    public ReferenceData getMessageReference(MessageId identity) throws IOException {
+        lock.lock();
+        try {
+            ReferenceRecord result = messageContainer.get(identity);
+            if (result == null) {
+                return null;
+            }
+            return result.getData();
+        }finally {
+            lock.unlock();
         }
-        return result.getData();
     }
     
     public void addReferenceFileIdsInUse() {
@@ -139,36 +169,57 @@
         removeMessage(ack.getLastMessageId());
     }
 
-    public synchronized void removeMessage(MessageId msgId) throws IOException {    
-        StoreEntry entry = messageContainer.getEntry(msgId);
-        if (entry != null) {
-            ReferenceRecord rr = messageContainer.remove(msgId);
-            if (rr != null) {
-                removeInterest(rr);
-                if (messageContainer.isEmpty()
-                    || (lastBatchId != null && lastBatchId.equals(msgId.toString()))
-                    || (batchEntry != null && batchEntry.equals(entry))) {
-                    resetBatching();
+    public void removeMessage(MessageId msgId) throws IOException {  
+        lock.lock();
+        try {
+            StoreEntry entry = messageContainer.getEntry(msgId);
+            if (entry != null) {
+                ReferenceRecord rr = messageContainer.remove(msgId);
+                if (rr != null) {
+                    removeInterest(rr);
+                    if (messageContainer.isEmpty()
+                        || (lastBatchId != null && lastBatchId.equals(msgId.toString()))
+                        || (batchEntry != null && batchEntry.equals(entry))) {
+                        resetBatching();
+                    }
                 }
             }
+        }finally {
+            lock.unlock();
         }
     }
 
-    public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
-        messageContainer.clear();
+    public void removeAllMessages(ConnectionContext context) throws IOException {
+        lock.lock();
+        try {
+            messageContainer.clear();
+        }finally {
+            lock.unlock();
+        }
+        
     }
 
     public ActiveMQDestination getDestination() {
         return destination;
     }
 
-    public synchronized void delete() {
-        messageContainer.clear();
+    public void delete() {
+        lock.lock();
+        try {
+            messageContainer.clear();
+        }finally {
+            lock.unlock();
+        }
     }
 
-    public synchronized void resetBatching() {
-        batchEntry = null;
-        lastBatchId = null;
+    public void resetBatching() {
+        lock.lock();
+        try {
+            batchEntry = null;
+            lastBatchId = null;
+        }finally {
+            lock.unlock();
+        }
     }
 
     public int getMessageCount() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=618981&r1=618980&r2=618981&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Wed Feb  6 04:23:41 2008
@@ -25,6 +25,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -177,7 +178,7 @@
         }
         return rc;
     }
-
+/*
     public void buildReferenceFileIdsInUse() throws IOException {
         recordReferences = new HashMap<Integer, AtomicInteger>();
         Set<ActiveMQDestination> destinations = getDestinations();
@@ -191,6 +192,7 @@
             }
         }
     }
+    */
 
     protected MapContainer<MessageId, ReferenceRecord> getMapReferenceContainer(Object id,
                                                                                 String containerName)
@@ -249,6 +251,7 @@
      * @throws IOException
      * @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState()
      */
+   
     public void recoverState() throws IOException {
         for (Iterator<SubscriptionInfo> i = durableSubscribers.iterator(); i.hasNext();) {
             SubscriptionInfo info = i.next();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=618981&r1=618980&r2=618981&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Wed Feb  6 04:23:41 2008
@@ -20,6 +20,7 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -62,33 +63,38 @@
         return new MessageId(((ReferenceRecord)object).getMessageId());
     }
 
-    public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
+    public void addMessage(ConnectionContext context, Message message) throws IOException {
         throw new RuntimeException("Use addMessageReference instead");
     }
 
-    public synchronized Message getMessage(MessageId identity) throws IOException {
+    public Message getMessage(MessageId identity) throws IOException {
         throw new RuntimeException("Use addMessageReference instead");
     }
 
-    public synchronized void addMessageReference(final ConnectionContext context, final MessageId messageId,
+    public  void addMessageReference(final ConnectionContext context, final MessageId messageId,
                                     final ReferenceData data) {
-        final ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
-        final int subscriberCount = subscriberMessages.size();
-        if (subscriberCount > 0) {
-            final StoreEntry messageEntry = messageContainer.place(messageId, record);
-            addInterest(record);
-            final TopicSubAck tsa = new TopicSubAck();
-            tsa.setCount(subscriberCount);
-            tsa.setMessageEntry(messageEntry);
-            final StoreEntry ackEntry = ackContainer.placeLast(tsa);
-            for (final Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
-                final TopicSubContainer container = i.next();
-                final ConsumerMessageRef ref = new ConsumerMessageRef();
-                ref.setAckEntry(ackEntry);
-                ref.setMessageEntry(messageEntry);
-                ref.setMessageId(messageId);
-                container.add(ref);
+        lock.lock();
+        try {
+            final ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
+            final int subscriberCount = subscriberMessages.size();
+            if (subscriberCount > 0) {
+                final StoreEntry messageEntry = messageContainer.place(messageId, record);
+                addInterest(record);
+                final TopicSubAck tsa = new TopicSubAck();
+                tsa.setCount(subscriberCount);
+                tsa.setMessageEntry(messageEntry);
+                final StoreEntry ackEntry = ackContainer.placeLast(tsa);
+                for (final Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
+                    final TopicSubContainer container = i.next();
+                    final ConsumerMessageRef ref = new ConsumerMessageRef();
+                    ref.setAckEntry(ackEntry);
+                    ref.setMessageEntry(messageEntry);
+                    ref.setMessageId(messageId);
+                    container.add(ref);
+                }
             }
+        }finally {
+            lock.unlock();
         }
     }
 
@@ -121,100 +127,119 @@
         return container;
     }
 
-    public synchronized boolean acknowledgeReference(ConnectionContext context,
+    public boolean acknowledgeReference(ConnectionContext context,
             String clientId, String subscriptionName, MessageId messageId)
             throws IOException {
         boolean removeMessage = false;
-        String key = getSubscriptionKey(clientId, subscriptionName);
-
-        TopicSubContainer container = subscriberMessages.get(key);
-        if (container != null) {
-            ConsumerMessageRef ref = null;
-            if((ref = container.remove(messageId)) != null) {
-                TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
-                if (tsa != null) {
-                    if (tsa.decrementCount() <= 0) {
-                        StoreEntry entry = ref.getAckEntry();
-                        entry = ackContainer.refresh(entry);
-                        ackContainer.remove(entry);
-                        ReferenceRecord rr = messageContainer.get(messageId);
-                        if (rr != null) {
-                            entry = tsa.getMessageEntry();
-                            entry = messageContainer.refresh(entry);
-                            messageContainer.remove(entry);
-                            removeInterest(rr);
-                            removeMessage = true;
+        lock.lock();
+            try {
+            String key = getSubscriptionKey(clientId, subscriptionName);
+    
+            TopicSubContainer container = subscriberMessages.get(key);
+            if (container != null) {
+                ConsumerMessageRef ref = null;
+                if((ref = container.remove(messageId)) != null) {
+                    TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
+                    if (tsa != null) {
+                        if (tsa.decrementCount() <= 0) {
+                            StoreEntry entry = ref.getAckEntry();
+                            entry = ackContainer.refresh(entry);
+                            ackContainer.remove(entry);
+                            ReferenceRecord rr = messageContainer.get(messageId);
+                            if (rr != null) {
+                                entry = tsa.getMessageEntry();
+                                entry = messageContainer.refresh(entry);
+                                messageContainer.remove(entry);
+                                removeInterest(rr);
+                                removeMessage = true;
+                            }
                         }
                     }
+                }else{
+                    //no message held
+                    removeMessage = true;
                 }
-            }else{
-                //no message held
-                removeMessage = true;
             }
+        }finally {
+            lock.unlock();
         }
         return removeMessage;
 
     }
     
-    public synchronized void acknowledge(ConnectionContext context,
+    public void acknowledge(ConnectionContext context,
 			String clientId, String subscriptionName, MessageId messageId)
 			throws IOException {
 		String key = getSubscriptionKey(clientId, subscriptionName);
-
-		TopicSubContainer container = subscriberMessages.get(key);
-		if (container != null) {
-            ConsumerMessageRef ref = container.remove(messageId);
-            if (ref != null) {
-                TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
-                if (tsa != null) {
-                    if (tsa.decrementCount() <= 0) {
-                        StoreEntry entry = ref.getAckEntry();
-                        entry = ackContainer.refresh(entry);
-                        ackContainer.remove(entry);
-                        ReferenceRecord rr = messageContainer.get(messageId);
-                        if (rr != null) {
-                            entry = tsa.getMessageEntry();
-                            entry = messageContainer.refresh(entry);
-                            messageContainer.remove(entry);
-                            removeInterest(rr);
+		lock.lock();
+		try {
+    		TopicSubContainer container = subscriberMessages.get(key);
+    		if (container != null) {
+                ConsumerMessageRef ref = container.remove(messageId);
+                if (ref != null) {
+                    TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
+                    if (tsa != null) {
+                        if (tsa.decrementCount() <= 0) {
+                            StoreEntry entry = ref.getAckEntry();
+                            entry = ackContainer.refresh(entry);
+                            ackContainer.remove(entry);
+                            ReferenceRecord rr = messageContainer.get(messageId);
+                            if (rr != null) {
+                                entry = tsa.getMessageEntry();
+                                entry = messageContainer.refresh(entry);
+                                messageContainer.remove(entry);
+                                removeInterest(rr);
+                            }
+                        } else {
+    
+                            ackContainer.update(ref.getAckEntry(), tsa);
                         }
-                    } else {
-
-                        ackContainer.update(ref.getAckEntry(), tsa);
                     }
                 }
             }
-        }
+		}finally {
+		    lock.unlock();
+		}
 	}
 
-    public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
+    public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
         String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
-        // if already exists - won't add it again as it causes data files
-        // to hang around
-        if (!subscriberContainer.containsKey(key)) {
-            subscriberContainer.put(key, info);
-            adapter.addSubscriberState(info);
-        }
-        // add the subscriber
-        addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
-        if (retroactive) {
-            /*
-             * for(StoreEntry
-             * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
-             * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
-             * ConsumerMessageRef ref=new ConsumerMessageRef();
-             * ref.setAckEntry(entry);
-             * ref.setMessageEntry(tsa.getMessageEntry()); container.add(ref); }
-             */
+        lock.lock();
+        try {
+            // if already exists - won't add it again as it causes data files
+            // to hang around
+            if (!subscriberContainer.containsKey(key)) {
+                subscriberContainer.put(key, info);
+                adapter.addSubscriberState(info);
+            }
+            // add the subscriber
+            addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
+            if (retroactive) {
+                /*
+                 * for(StoreEntry
+                 * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
+                 * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
+                 * ConsumerMessageRef ref=new ConsumerMessageRef();
+                 * ref.setAckEntry(entry);
+                 * ref.setMessageEntry(tsa.getMessageEntry()); container.add(ref); }
+                 */
+            }
+        }finally {
+            lock.unlock();
         }
     }
 
-    public synchronized void deleteSubscription(String clientId, String subscriptionName) throws IOException {
-        SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
-        if (info != null) {
-            adapter.removeSubscriberState(info);
-        }
+    public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+        lock.lock();
+        try {
+            SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
+            if (info != null) {
+                adapter.removeSubscriberState(info);
+            }
         removeSubscriberMessageContainer(clientId,subscriptionName);
+        }finally {
+            lock.unlock();
+        }
     }
 
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
@@ -233,41 +258,46 @@
         return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName));
     }
 
-    public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
+    public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
                                                  MessageRecoveryListener listener) throws Exception {
         String key = getSubscriptionKey(clientId, subscriptionName);
-        TopicSubContainer container = subscriberMessages.get(key);
-        if (container != null) {
-            int count = 0;
-            StoreEntry entry = container.getBatchEntry();
-            if (entry == null) {
-                entry = container.getEntry();
-            } else {
-                entry = container.refreshEntry(entry);
-                if (entry != null) {
-                    entry = container.getNextEntry(entry);
+        lock.lock();
+        try {
+            TopicSubContainer container = subscriberMessages.get(key);
+            if (container != null) {
+                int count = 0;
+                StoreEntry entry = container.getBatchEntry();
+                if (entry == null) {
+                    entry = container.getEntry();
+                } else {
+                    entry = container.refreshEntry(entry);
+                    if (entry != null) {
+                        entry = container.getNextEntry(entry);
+                    }
                 }
-            }
-           
-            if (entry != null) {
-                do {
-                    ConsumerMessageRef consumerRef = container.get(entry);
-                    ReferenceRecord msg = messageContainer.getValue(consumerRef
-                            .getMessageEntry());
-                    if (msg != null) {
-                        if (recoverReference(listener, msg)) {
-                            count++;
-                            container.setBatchEntry(msg.getMessageId(), entry);
+               
+                if (entry != null) {
+                    do {
+                        ConsumerMessageRef consumerRef = container.get(entry);
+                        ReferenceRecord msg = messageContainer.getValue(consumerRef
+                                .getMessageEntry());
+                        if (msg != null) {
+                            if (recoverReference(listener, msg)) {
+                                count++;
+                                container.setBatchEntry(msg.getMessageId(), entry);
+                            } else {
+                                break;
+                            }
                         } else {
-                            break;
+                            container.reset();
                         }
-                    } else {
-                        container.reset();
-                    }
-
-                    entry = container.getNextEntry(entry);
-                } while (entry != null && count < maxReturned && listener.hasSpace());
+    
+                        entry = container.getNextEntry(entry);
+                    } while (entry != null && count < maxReturned && listener.hasSpace());
+                }
             }
+        }finally {
+            lock.unlock();
         }
     }
 
@@ -288,11 +318,16 @@
         }
     }
 
-    public synchronized void resetBatching(String clientId, String subscriptionName) {
-        String key = getSubscriptionKey(clientId, subscriptionName);
-        TopicSubContainer topicSubContainer = subscriberMessages.get(key);
-        if (topicSubContainer != null) {
-            topicSubContainer.reset();
+    public void resetBatching(String clientId, String subscriptionName) {
+        lock.lock();
+        try {
+            String key = getSubscriptionKey(clientId, subscriptionName);
+            TopicSubContainer topicSubContainer = subscriberMessages.get(key);
+            if (topicSubContainer != null) {
+                topicSubContainer.reset();
+            }
+        }finally {
+            lock.unlock();
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?rev=618981&r1=618980&r2=618981&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Wed Feb  6 04:23:41 2008
@@ -124,7 +124,7 @@
     }
 
     public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
-        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
+        MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
         if (sub != null) {
             sub.recoverNextMessages(maxReturned, listener);
         }



Mime
View raw message