activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r945102 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/region/ store/ store/kahadb/
Date Mon, 17 May 2010 11:53:28 GMT
Author: rajdavies
Date: Mon May 17 11:53:28 2010
New Revision: 945102

URL: http://svn.apache.org/viewvc?rev=945102&view=rev
Log:
added support for concurrent dispatch and store of persistent messages in KahaDB

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon May 17 11:53:28 2010
@@ -29,18 +29,18 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.ResourceAllocationException;
-
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -105,13 +105,13 @@ public class Queue extends BaseDestinati
     private final Object dispatchMutex = new Object();
     private boolean useConsumerPriority = true;
     private boolean strictOrderDispatch = false;
-    private QueueDispatchSelector dispatchSelector;
+    private final QueueDispatchSelector dispatchSelector;
     private boolean optimizedDispatch = false;
     private boolean firstConsumer = false;
     private int timeBeforeDispatchStarts = 0;
     private int consumersBeforeDispatchStarts = 0;
     private CountDownLatch consumersBeforeStartsLatch;
-    private AtomicLong pendingWakeups = new AtomicLong();
+    private final AtomicLong pendingWakeups = new AtomicLong();
 
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
         public void run() {
@@ -163,6 +163,7 @@ public class Queue extends BaseDestinati
     
     class FlowControlTimeoutTask extends Thread {
         
+        @Override
         public void run() {
             TimeoutMessage timeout;
             try {
@@ -220,6 +221,7 @@ public class Queue extends BaseDestinati
         }
     }
 
+    @Override
     public void initialize() throws Exception {
         if (this.messages == null) {
             if (destination.isTemporary() || broker == null || store == null) {
@@ -554,6 +556,7 @@ public class Queue extends BaseDestinati
 
     void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
+        Future<Object> result = null;
         synchronized (sendLock) {
             if (store != null && message.isPersistent()) {
                 if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
@@ -568,8 +571,11 @@ public class Queue extends BaseDestinati
                     waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
                 }
                 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
-                store.addMessage(context, message);
-
+                if (context.isInTransaction()) {
+                    store.addMessage(context, message);
+                }else {
+                    result = store.asyncAddQueueMessage(context, message);
+                }
             }
         }
         if (context.isInTransaction()) {
@@ -578,6 +584,7 @@ public class Queue extends BaseDestinati
             // our memory. This increment is decremented once the tx finishes..
             message.incrementReferenceCount();
             context.getTransaction().addSynchronization(new Synchronization() {
+                @Override
                 public void afterCommit() throws Exception {
                     try {
                         // It could take while before we receive the commit
@@ -603,6 +610,14 @@ public class Queue extends BaseDestinati
             // usage manager.
             sendMessage(context, message);
         }
+        if (result != null && !result.isCancelled()) {
+            try {
+            result.get();
+            }catch(CancellationException e) {
+              //ignore - the task has been cancelled if the message
+              // has already been deleted
+            }
+        }
     }
 
     private void expireMessages() {
@@ -651,7 +666,7 @@ public class Queue extends BaseDestinati
                 ack.setLastMessageId(node.getMessageId());
                 ack.setMessageCount(1);
             }
-            store.removeMessage(context, ack);
+            store.removeAsyncMessage(context, ack);
         }
     }
 
@@ -666,6 +681,7 @@ public class Queue extends BaseDestinati
         return msg;
     }
 
+    @Override
     public String toString() {
         int size = 0;
         synchronized (messages) {
@@ -725,6 +741,7 @@ public class Queue extends BaseDestinati
 
     // Properties
     // -------------------------------------------------------------------------
+    @Override
     public ActiveMQDestination getActiveMQDestination() {
         return destination;
     }
@@ -936,7 +953,7 @@ public class Queue extends BaseDestinati
             for (MessageReference ref : list) {
                 try {
                     QueueMessageReference r = (QueueMessageReference) ref;
-                    removeMessage(c, (IndirectMessageReference) r);
+                    removeMessage(c, r);
                 } catch (IOException e) {
                 }
             }
@@ -1273,6 +1290,7 @@ public class Queue extends BaseDestinati
                 return messageId.equals(r.getMessageId().toString());
             }
 
+            @Override
             public String toString() {
                 return "MessageIdFilter: " + messageId;
             }
@@ -1326,12 +1344,14 @@ public class Queue extends BaseDestinati
             } finally {
                 context.getTransaction().addSynchronization(new Synchronization() {
 
+                    @Override
                     public void afterCommit() throws Exception {
                         getDestinationStatistics().getDequeues().increment();
                         dropMessage(reference);
                         wakeup();
                     }
 
+                    @Override
                     public void afterRollback() throws Exception {
                         reference.setAcked(false);
                     }
@@ -1634,6 +1654,7 @@ public class Queue extends BaseDestinati
      * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification
      * (org.apache.activemq.command.MessageDispatchNotification)
      */
+    @Override
     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
         // do dispatch
         Subscription sub = getMatchingSubscription(messageDispatchNotification);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon May 17 11:53:28 2010
@@ -21,10 +21,11 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
-
+import java.util.concurrent.Future;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -87,6 +88,7 @@ public class Topic extends BaseDestinati
         this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
     }
 
+    @Override
     public void initialize() throws Exception {
         super.initialize();
         if (store != null) {
@@ -402,6 +404,7 @@ public class Topic extends BaseDestinati
         final ConnectionContext context = producerExchange.getConnectionContext();
         message.setRegionDestination(this);
         message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
+        Future<Object> result = null;
 
         if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
             if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
@@ -413,13 +416,18 @@ public class Topic extends BaseDestinati
 
                 waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
             }
-            topicStore.addMessage(context, message);
+            if (context.isInTransaction()) {
+                topicStore.addMessage(context, message);
+            }else {
+                result = topicStore.asyncAddTopicMessage(context, message);
+            }      
         }
 
         message.incrementReferenceCount();
 
         if (context.isInTransaction()) {
             context.getTransaction().addSynchronization(new Synchronization() {
+                @Override
                 public void afterCommit() throws Exception {
                     // It could take while before we receive the commit
                     // operration.. by that time the message could have
@@ -445,6 +453,14 @@ public class Topic extends BaseDestinati
                 message.decrementReferenceCount();
             }
         }
+        if (result != null && !result.isCancelled()) {
+            try {
+            result.get();
+            }catch(CancellationException e) {
+              //ignore - the task has been cancelled if the message
+              // has already been deleted
+            }
+        }
 
     }
 
@@ -452,6 +468,7 @@ public class Topic extends BaseDestinati
         return durableSubcribers.size() == 0;
     }
 
+    @Override
     public String toString() {
         return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java Mon May 17 11:53:28 2010
@@ -17,18 +17,24 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.usage.MemoryUsage;
 
 abstract public class AbstractMessageStore implements MessageStore {
+    static final FutureTask<Object> FUTURE;
     protected final ActiveMQDestination destination;
 
     public AbstractMessageStore(ActiveMQDestination destination) {
         this.destination = destination;
     }
-
+    
     public void dispose(ConnectionContext context) {
     }
 
@@ -44,16 +50,43 @@ abstract public class AbstractMessageSto
 
     public void setMemoryUsage(MemoryUsage memoryUsage) {
     }
-    
+
     public void setBatch(MessageId messageId) throws IOException, Exception {
     }
-    
+
     /**
      * flag to indicate if the store is empty
+     * 
      * @return true if the message count is 0
-     * @throws Exception 
+     * @throws Exception
      */
-     public boolean isEmpty() throws Exception{
-         return getMessageCount()==0;
-     }
+    public boolean isEmpty() throws Exception {
+        return getMessageCount() == 0;
+    }
+
+    public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException {
+        addMessage(context, message);
+        return FUTURE;
+    }
+
+       
+    public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException {
+        addMessage(context, message);
+        return FUTURE;
+    }
+
+    public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
+        removeMessage(context, ack);
+    }
+    
+    static class CallableImplementation implements Callable<Object> {
+        public Object call() throws Exception {
+            return null;
+        }
+    }
+
+    static {
+       FUTURE = new FutureTask<Object>(new CallableImplementation());
+       FUTURE.run();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java Mon May 17 11:53:28 2010
@@ -17,6 +17,7 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
+import java.util.concurrent.Future;
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -40,6 +41,30 @@ public interface MessageStore extends Se
      * @throws IOException
      */
     void addMessage(ConnectionContext context, Message message) throws IOException;
+    
+    /**
+     * Adds a message to the message store
+     * 
+     * @param context context
+     * @param message
+     * @param l 
+     * @return a Future to track when this is complete
+     * @throws IOException 
+     * @throws IOException
+     */
+    Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException;
+    
+    /**
+     * Adds a message to the message store
+     * 
+     * @param context context
+     * @param message
+     * @param l 
+     * @return a Future to track when this is complete
+     * @throws IOException 
+     * @throws IOException
+     */
+    Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException;
 
     /**
      * Looks up a message using either the String messageID or the
@@ -62,6 +87,8 @@ public interface MessageStore extends Se
      * @throws IOException
      */
     void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
+    
+    void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException;
 
     /**
      * Removes all the messages from the message store.

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java Mon May 17 11:53:28 2010
@@ -17,6 +17,7 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
+import java.util.concurrent.Future;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -100,4 +101,16 @@ public class ProxyMessageStore implement
     public boolean isEmpty() throws Exception {
        return delegate.isEmpty();
     }
+
+    public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
+       return delegate.asyncAddQueueMessage(context, message);
+    }
+    
+    public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
+        return delegate.asyncAddTopicMessage(context, message);
+     }
+    
+    public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
+        delegate.removeAsyncMessage(context, ack);       
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Mon May 17 11:53:28 2010
@@ -17,6 +17,7 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
+import java.util.concurrent.Future;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -140,4 +141,16 @@ public class ProxyTopicMessageStore impl
     public boolean isEmpty() throws Exception {
         return delegate.isEmpty();
      }
+
+    public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
+        return delegate.asyncAddTopicMessage(context, message);
+     }
+
+    public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
+        return delegate.asyncAddQueueMessage(context, message);
+    }
+
+    public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
+        delegate.removeAsyncMessage(context, ack);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java Mon May 17 11:53:28 2010
@@ -17,9 +17,7 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
-
 import javax.jms.JMSException;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
@@ -42,7 +40,7 @@ public interface TopicMessageStore exten
      * @throws IOException
      */
     void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException;
-
+    
     /**
      * @param clientId
      * @param subscriptionName

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Mon May 17 11:53:28 2010
@@ -386,4 +386,35 @@ public class KahaDBPersistenceAdapter im
     public void setDirectoryArchive(File directoryArchive) {
         letter.setDirectoryArchive(directoryArchive);
     }
+    
+    public boolean isConcurrentStoreAndDispatchQueues() {
+        return letter.isConcurrentStoreAndDispatchQueues();
+    }
+    
+    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
+        letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
+    }    
+    
+    public boolean isConcurrentStoreAndDispatchTopics() {
+        return letter.isConcurrentStoreAndDispatchTopics();
+    }
+    
+    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
+        letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
+    }    
+    
+    public int getMaxAsyncJobs() {
+        return letter.getMaxAsyncJobs();
+    }
+    /**
+     * @param maxAsyncJobs the maxAsyncJobs to set
+     */
+    public void setMaxAsyncJobs(int maxAsyncJobs) {
+       letter.setMaxAsyncJobs(maxAsyncJobs);
+    }   
+    
+    @Override
+    public String toString() {
+        return "KahaDBPersistenceAdapter";
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Mon May 17 11:53:28 2010
@@ -18,12 +18,25 @@ package org.apache.activemq.store.kahadb
 
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -46,7 +59,6 @@ import org.apache.activemq.store.Persist
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.TransactionStore;
-import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
 import org.apache.activemq.store.kahadb.data.KahaDestination;
@@ -62,23 +74,167 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Transaction;
 
-
 public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
-
+    private static final int MAX_ASYNC_JOBS = 10000;
+    protected ExecutorService queueExecutor;
+    protected ExecutorService topicExecutor;
+    protected final Map<MessageId, StoreQueueTask> asyncQueueMap = new HashMap<MessageId, StoreQueueTask>();
+    protected final Map<MessageId, StoreTopicTask> asyncTopicMap = new HashMap<MessageId, StoreTopicTask>();
     private final WireFormat wireFormat = new OpenWireFormat();
+    private SystemUsage usageManager;
+    private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
+    private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
+    private Semaphore queueSemaphore;
+    private Semaphore topicSemaphore;
+    private boolean concurrentStoreAndDispatchQueues = true;
+    private boolean concurrentStoreAndDispatchTopics = true;
+    private int maxAsyncJobs = MAX_ASYNC_JOBS;
+
+    public KahaDBStore() {
 
+    }
     public void setBrokerName(String brokerName) {
     }
+
     public void setUsageManager(SystemUsage usageManager) {
+        this.usageManager = usageManager;
+    }
+
+    public SystemUsage getUsageManager() {
+        return this.usageManager;
+    }
+
+    /**
+     * @return the concurrentStoreAndDispatch
+     */
+    public boolean isConcurrentStoreAndDispatchQueues() {
+        return this.concurrentStoreAndDispatchQueues;
+    }
+
+    /**
+     * @param concurrentStoreAndDispatch
+     *            the concurrentStoreAndDispatch to set
+     */
+    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
+        this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
+    }
+
+    /**
+     * @return the concurrentStoreAndDispatch
+     */
+    public boolean isConcurrentStoreAndDispatchTopics() {
+        return this.concurrentStoreAndDispatchTopics;
+    }
+
+    /**
+     * @param concurrentStoreAndDispatch
+     *            the concurrentStoreAndDispatch to set
+     */
+    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
+        this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
+    }
+
+    /**
+     * @return the maxAsyncJobs
+     */
+    public int getMaxAsyncJobs() {
+        return this.maxAsyncJobs;
+    }
+    /**
+     * @param maxAsyncJobs
+     *            the maxAsyncJobs to set
+     */
+    public void setMaxAsyncJobs(int maxAsyncJobs) {
+        this.maxAsyncJobs = maxAsyncJobs;
+    }
+
+    @Override
+    public void doStart() throws Exception {
+        this.queueSemaphore = new Semaphore(getMaxAsyncJobs());
+        this.topicSemaphore = new Semaphore(getMaxAsyncJobs());
+        this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
+        this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
+        this.queueExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, asyncQueueJobQueue,
+                new ThreadFactory() {
+                    public Thread newThread(Runnable runnable) {
+                        Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
+                        thread.setDaemon(true);
+                        return thread;
+                    }
+                });
+        this.topicExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, asyncTopicJobQueue,
+                new ThreadFactory() {
+                    public Thread newThread(Runnable runnable) {
+                        Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
+                        thread.setDaemon(true);
+                        return thread;
+                    }
+                });
+        super.doStart();
+
+    }
+
+    @Override
+    public void doStop(ServiceStopper stopper) throws Exception {
+        this.queueSemaphore.drainPermits();
+        this.topicSemaphore.drainPermits();
+        if (this.queueExecutor != null) {
+            this.queueExecutor.shutdownNow();
+        }
+        if (this.topicExecutor != null) {
+            this.topicExecutor.shutdownNow();
+        }
+        super.doStop(stopper);
+    }
+
+    protected StoreQueueTask removeQueueTask(MessageId id) {
+        StoreQueueTask task = this.asyncQueueMap.remove(id);
+        if (task != null) {
+            task.getMessage().decrementReferenceCount();
+            this.queueSemaphore.release();
+        }
+        return task;
+    }
+
+    protected void addQueueTask(StoreQueueTask task) throws IOException {
+        try {
+            this.queueSemaphore.acquire();
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException(e.getMessage());
+        }
+        this.asyncQueueMap.put(task.getMessage().getMessageId(), task);
+        task.getMessage().incrementReferenceCount();
+        this.queueExecutor.execute(task);
+    }
+
+    protected StoreTopicTask removeTopicTask(MessageId id) {
+        StoreTopicTask task = this.asyncTopicMap.remove(id);
+        if (task != null) {
+            task.getMessage().decrementReferenceCount();
+            this.topicSemaphore.release();
+        }
+        return task;
+    }
+
+    protected void addTopicTask(StoreTopicTask task) throws IOException {
+        try {
+            this.topicSemaphore.acquire();
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException(e.getMessage());
+        }
+        this.asyncTopicMap.put(task.getMessage().getMessageId(), task);
+        task.getMessage().incrementReferenceCount();
+        this.topicExecutor.execute(task);
     }
 
     public TransactionStore createTransactionStore() throws IOException {
-        return new TransactionStore(){
-            
+        return new TransactionStore() {
+
             public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
                 store(new KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)), true);
             }
@@ -90,22 +246,24 @@ public class KahaDBStore extends Message
             }
             public void recover(TransactionRecoveryListener listener) throws IOException {
                 for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
-                    XATransactionId xid = (XATransactionId)entry.getKey();
+                    XATransactionId xid = (XATransactionId) entry.getKey();
                     ArrayList<Message> messageList = new ArrayList<Message>();
                     ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
-                    
+
                     for (Operation op : entry.getValue()) {
-                        if( op.getClass() == AddOpperation.class ) {
-                            AddOpperation addOp = (AddOpperation)op;
-                            Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) );
+                        if (op.getClass() == AddOpperation.class) {
+                            AddOpperation addOp = (AddOpperation) op;
+                            Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand()
+                                    .getMessage().newInput()));
                             messageList.add(msg);
                         } else {
-                            RemoveOpperation rmOp = (RemoveOpperation)op;
-                            MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) );
+                            RemoveOpperation rmOp = (RemoveOpperation) op;
+                            MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(rmOp.getCommand()
+                                    .getAck().newInput()));
                             ackList.add(ack);
                         }
                     }
-                    
+
                     Message[] addedMessages = new Message[messageList.size()];
                     MessageAck[] acks = new MessageAck[ackList.size()];
                     messageList.toArray(addedMessages);
@@ -125,7 +283,7 @@ public class KahaDBStore extends Message
 
         public KahaDBMessageStore(ActiveMQDestination destination) {
             super(destination);
-            this.dest = convert( destination );
+            this.dest = convert(destination);
         }
 
         @Override
@@ -133,24 +291,52 @@ public class KahaDBStore extends Message
             return destination;
         }
 
+        @Override
+        public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
+                throws IOException {
+            if (isConcurrentStoreAndDispatchQueues()) {
+                StoreQueueTask result = new StoreQueueTask(this, context, message);
+                addQueueTask(result);
+                return result.getFuture();
+            } else {
+                return super.asyncAddQueueMessage(context, message);
+            }
+        }
+
+        @Override
+        public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
+            if (isConcurrentStoreAndDispatchQueues()) {
+                StoreQueueTask task = removeQueueTask(ack.getLastMessageId());
+                if (task != null) {
+                    if (!task.cancel()) {
+                        removeMessage(context, ack);
+                    }
+                } else {
+                    removeMessage(context, ack);
+                }
+            } else {
+                removeMessage(context, ack);
+            }
+        }
+
         public void addMessage(ConnectionContext context, Message message) throws IOException {
             KahaAddMessageCommand command = new KahaAddMessageCommand();
             command.setDestination(dest);
             command.setMessageId(message.getMessageId().toString());
-            command.setTransactionInfo( createTransactionInfo(message.getTransactionId()) );
+            command.setTransactionInfo(createTransactionInfo(message.getTransactionId()));
 
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
             command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
 
             store(command, isEnableJournalDiskSyncs() && message.isResponseRequired());
-            
+
         }
-        
+
         public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
             KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
             command.setDestination(dest);
             command.setMessageId(ack.getLastMessageId().toString());
-            command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()) );
+            command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()));
             store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired());
         }
 
@@ -162,37 +348,40 @@ public class KahaDBStore extends Message
 
         public Message getMessage(MessageId identity) throws IOException {
             final String key = identity.toString();
-            
-            // Hopefully one day the page file supports concurrent read operations... but for now we must
+
+            // Hopefully one day the page file supports concurrent read
+            // operations... but for now we must
             // externally synchronize...
             Location location;
-            synchronized(indexMutex) {
-                location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>(){
+            synchronized (indexMutex) {
+                location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
                     public Location execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Long sequence = sd.messageIdIndex.get(tx, key);
-                        if( sequence ==null ) {
+                        if (sequence == null) {
                             return null;
                         }
                         return sd.orderIndex.get(tx, sequence).location;
                     }
                 });
             }
-            if( location == null ) {
+            if (location == null) {
                 return null;
             }
-            
+
             return loadMessage(location);
         }
-        
+
         public int getMessageCount() throws IOException {
-            synchronized(indexMutex) {
-                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
+            synchronized (indexMutex) {
+                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
                     public Integer execute(Transaction tx) throws IOException {
-                        // Iterate through all index entries to get a count of messages in the destination.
+                        // Iterate through all index entries to get a count of
+                        // messages in the destination.
                         StoredDestination sd = getStoredDestination(dest, tx);
-                        int rc=0;
-                        for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
+                        int rc = 0;
+                        for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
+                                .hasNext();) {
                             iterator.next();
                             rc++;
                         }
@@ -201,12 +390,14 @@ public class KahaDBStore extends Message
                 });
             }
         }
-        
+
+        @Override
         public boolean isEmpty() throws IOException {
-            synchronized(indexMutex) {
-                return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>(){
+            synchronized (indexMutex) {
+                return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
                     public Boolean execute(Transaction tx) throws IOException {
-                        // Iterate through all index entries to get a count of messages in the destination.
+                        // Iterate through all index entries to get a count of
+                        // messages in the destination.
                         StoredDestination sd = getStoredDestination(dest, tx);
                         return sd.locationIndex.isEmpty(tx);
                     }
@@ -214,40 +405,41 @@ public class KahaDBStore extends Message
             }
         }
 
-
         public void recover(final MessageRecoveryListener listener) throws Exception {
-            synchronized(indexMutex) {
-                pageFile.tx().execute(new Transaction.Closure<Exception>(){
+            synchronized (indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<Exception>() {
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
-                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
+                                .hasNext();) {
                             Entry<Long, MessageKeys> entry = iterator.next();
-                            listener.recoverMessage( loadMessage(entry.getValue().location) );
+                            listener.recoverMessage(loadMessage(entry.getValue().location));
                         }
                     }
                 });
             }
         }
 
-        long cursorPos=0;
-        
+        long cursorPos = 0;
+
         public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
-            synchronized(indexMutex) {
-                pageFile.tx().execute(new Transaction.Closure<Exception>(){
+            synchronized (indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<Exception>() {
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
-                        Entry<Long, MessageKeys> entry=null;
+                        Entry<Long, MessageKeys> entry = null;
                         int counter = 0;
-                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
+                                .hasNext();) {
                             entry = iterator.next();
-                            listener.recoverMessage( loadMessage(entry.getValue().location ) );
+                            listener.recoverMessage(loadMessage(entry.getValue().location));
                             counter++;
-                            if( counter >= maxReturned ) {
+                            if (counter >= maxReturned) {
                                 break;
                             }
                         }
-                        if( entry!=null ) {
-                            cursorPos = entry.getKey()+1;
+                        if (entry != null) {
+                            cursorPos = entry.getKey() + 1;
                         }
                     }
                 });
@@ -255,29 +447,29 @@ public class KahaDBStore extends Message
         }
 
         public void resetBatching() {
-            cursorPos=0;
+            cursorPos = 0;
         }
 
-        
         @Override
         public void setBatch(MessageId identity) throws IOException {
             final String key = identity.toString();
-            
-            // Hopefully one day the page file supports concurrent read operations... but for now we must
+
+            // Hopefully one day the page file supports concurrent read
+            // operations... but for now we must
             // externally synchronize...
             Long location;
-            synchronized(indexMutex) {
-                location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){
+            synchronized (indexMutex) {
+                location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
                     public Long execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         return sd.messageIdIndex.get(tx, key);
                     }
                 });
             }
-            if( location!=null ) {
-                cursorPos=location+1;
+            if (location != null) {
+                cursorPos = location + 1;
             }
-            
+
         }
 
         @Override
@@ -285,32 +477,65 @@ public class KahaDBStore extends Message
         }
         @Override
         public void start() throws Exception {
+            super.start();
         }
         @Override
         public void stop() throws Exception {
+            super.stop();
         }
-        
+
     }
-        
+
     class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
-        public KahaDBTopicMessageStore(ActiveMQTopic destination) {
+        private final AtomicInteger subscriptionCount = new AtomicInteger();
+        public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
             super(destination);
+            this.subscriptionCount.set(getAllSubscriptions().length);
+        }
+
+        @Override
+        public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
+                throws IOException {
+            if (isConcurrentStoreAndDispatchTopics()) {
+                StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
+                addTopicTask(result);
+                return result.getFuture();
+            } else {
+                return super.asyncAddTopicMessage(context, message);
+            }
         }
-        
-        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
+
+        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId)
+                throws IOException {
+            String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            if (isConcurrentStoreAndDispatchTopics()) {
+                StoreTopicTask task = asyncTopicMap.get(messageId);
+                if (task != null) {
+
+                    if (task.addSubscriptionKey(subscriptionKey)) {
+                        removeTopicTask(messageId);
+                        task.cancel();
+                    }
+                } else {
+                    doAcknowledge(context, subscriptionKey, messageId);
+                }
+            } else {
+                doAcknowledge(context, subscriptionKey, messageId);
+            }
+        }
+
+        protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId)
+                throws IOException {
             KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
             command.setDestination(dest);
-            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
+            command.setSubscriptionKey(subscriptionKey);
             command.setMessageId(messageId.toString());
-            // We are not passed a transaction info.. so we can't participate in a transaction.
-            // Looks like a design issue with the TopicMessageStore interface.  Also we can't recover the original ack
-            // to pass back to the XA recover method.
-            // command.setTransactionInfo();
             store(command, false);
         }
 
         public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
-            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
+            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
+                    .getSubscriptionName());
             KahaSubscriptionCommand command = new KahaSubscriptionCommand();
             command.setDestination(dest);
             command.setSubscriptionKey(subscriptionKey);
@@ -318,6 +543,7 @@ public class KahaDBStore extends Message
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
             command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
             store(command, isEnableJournalDiskSyncs() && true);
+            this.subscriptionCount.incrementAndGet();
         }
 
         public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
@@ -325,111 +551,120 @@ public class KahaDBStore extends Message
             command.setDestination(dest);
             command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
             store(command, isEnableJournalDiskSyncs() && true);
+            this.subscriptionCount.decrementAndGet();
         }
 
         public SubscriptionInfo[] getAllSubscriptions() throws IOException {
-            
+
             final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
-            synchronized(indexMutex) {
-                pageFile.tx().execute(new Transaction.Closure<IOException>(){
+            synchronized (indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
-                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
+                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
+                                .hasNext();) {
                             Entry<String, KahaSubscriptionCommand> entry = iterator.next();
-                            SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) );
+                            SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
+                                    .getValue().getSubscriptionInfo().newInput()));
                             subscriptions.add(info);
 
                         }
                     }
                 });
             }
-            
-            SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
+
+            SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
             subscriptions.toArray(rc);
             return rc;
         }
 
         public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
-            synchronized(indexMutex) {
-                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){
+            synchronized (indexMutex) {
+                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
                     public SubscriptionInfo execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
-                        if( command ==null ) {
+                        if (command == null) {
                             return null;
                         }
-                        return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) );
+                        return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
+                                .getSubscriptionInfo().newInput()));
                     }
                 });
             }
         }
-       
+
         public int getMessageCount(String clientId, String subscriptionName) throws IOException {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
-            synchronized(indexMutex) {
-                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
+            synchronized (indexMutex) {
+                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
                     public Integer execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
-                        if ( cursorPos==null ) {
+                        if (cursorPos == null) {
                             // The subscription might not exist.
                             return 0;
                         }
                         cursorPos += 1;
-                        
+
                         int counter = 0;
-                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
+                                .hasNext();) {
                             iterator.next();
                             counter++;
                         }
                         return counter;
                     }
                 });
-            }        
+            }
         }
 
-        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
+        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
+                throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
-            synchronized(indexMutex) {
-                pageFile.tx().execute(new Transaction.Closure<Exception>(){
+            synchronized (indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<Exception>() {
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
                         cursorPos += 1;
-                        
-                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
+                                .hasNext();) {
                             Entry<Long, MessageKeys> entry = iterator.next();
-                            listener.recoverMessage( loadMessage(entry.getValue().location ) );
+                            listener.recoverMessage(loadMessage(entry.getValue().location));
                         }
                     }
                 });
             }
         }
 
-        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
+        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
+                final MessageRecoveryListener listener) throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
-            synchronized(indexMutex) {
-                pageFile.tx().execute(new Transaction.Closure<Exception>(){
+            synchronized (indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<Exception>() {
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
-                        if( cursorPos == null ) {
+                        if (cursorPos == null) {
                             cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
                             cursorPos += 1;
                         }
-                        
-                        Entry<Long, MessageKeys> entry=null;
+
+                        Entry<Long, MessageKeys> entry = null;
                         int counter = 0;
-                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
+                                .hasNext();) {
                             entry = iterator.next();
-                            listener.recoverMessage( loadMessage(entry.getValue().location ) );
+                            listener.recoverMessage(loadMessage(entry.getValue().location));
                             counter++;
-                            if( counter >= maxReturned ) {
+                            if (counter >= maxReturned) {
                                 break;
                             }
                         }
-                        if( entry!=null ) {
+                        if (entry != null) {
                             sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1);
                         }
                     }
@@ -440,8 +675,8 @@ public class KahaDBStore extends Message
         public void resetBatching(String clientId, String subscriptionName) {
             try {
                 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
-                synchronized(indexMutex) {
-                    pageFile.tx().execute(new Transaction.Closure<IOException>(){
+                synchronized (indexMutex) {
+                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
                         public void execute(Transaction tx) throws IOException {
                             StoredDestination sd = getStoredDestination(dest, tx);
                             sd.subscriptionCursors.remove(subscriptionKey);
@@ -454,10 +689,10 @@ public class KahaDBStore extends Message
         }
     }
 
-    String subscriptionKey(String clientId, String subscriptionName){
-        return clientId+":"+subscriptionName;
+    String subscriptionKey(String clientId, String subscriptionName) {
+        return clientId + ":" + subscriptionName;
     }
-    
+
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
         return new KahaDBMessageStore(destination);
     }
@@ -469,8 +704,9 @@ public class KahaDBStore extends Message
     /**
      * Cleanup method to remove any state associated with the given destination.
      * This method does not stop the message store (it might not be cached).
-     *
-     * @param destination Destination to forget
+     * 
+     * @param destination
+     *            Destination to forget
      */
     public void removeQueueMessageStore(ActiveMQQueue destination) {
     }
@@ -478,24 +714,25 @@ public class KahaDBStore extends Message
     /**
      * Cleanup method to remove any state associated with the given destination
      * This method does not stop the message store (it might not be cached).
-     *
-     * @param destination Destination to forget
+     * 
+     * @param destination
+     *            Destination to forget
      */
     public void removeTopicMessageStore(ActiveMQTopic destination) {
     }
 
     public void deleteAllMessages() throws IOException {
-        deleteAllMessages=true;
+        deleteAllMessages = true;
     }
-    
-    
+
     public Set<ActiveMQDestination> getDestinations() {
         try {
             final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
-            synchronized(indexMutex) {
-                pageFile.tx().execute(new Transaction.Closure<IOException>(){
+            synchronized (indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
-                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
+                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
+                                .hasNext();) {
                             Entry<String, StoredDestination> entry = iterator.next();
                             if (!isEmptyTopic(entry, tx)) {
                                 rc.add(convert(entry.getKey()));
@@ -503,7 +740,8 @@ public class KahaDBStore extends Message
                         }
                     }
 
-                    private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx) throws IOException {
+                    private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx)
+                            throws IOException {
                         boolean isEmptyTopic = false;
                         ActiveMQDestination dest = convert(entry.getKey());
                         if (dest.isTopic()) {
@@ -521,13 +759,13 @@ public class KahaDBStore extends Message
             throw new RuntimeException(e);
         }
     }
-    
+
     public long getLastMessageBrokerSequenceId() throws IOException {
         return 0;
     }
-    
+
     public long size() {
-        if ( !started.get() ) {
+        if (!started.get()) {
             return 0;
         }
         try {
@@ -546,15 +784,14 @@ public class KahaDBStore extends Message
     public void rollbackTransaction(ConnectionContext context) throws IOException {
         throw new IOException("Not yet implemented.");
     }
-    
+
     public void checkpoint(boolean sync) throws IOException {
         super.checkpointCleanup(false);
     }
-    
-    
-    ///////////////////////////////////////////////////////////////////
+
+    // /////////////////////////////////////////////////////////////////
     // Internal helper methods.
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
 
     /**
      * @param location
@@ -562,35 +799,35 @@ public class KahaDBStore extends Message
      * @throws IOException
      */
     Message loadMessage(Location location) throws IOException {
-        KahaAddMessageCommand addMessage = (KahaAddMessageCommand)load(location);
-        Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addMessage.getMessage().newInput()) );
+        KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
+        Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
         return msg;
     }
 
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
     // Internal conversion methods.
-    ///////////////////////////////////////////////////////////////////
-    
+    // /////////////////////////////////////////////////////////////////
+
     KahaTransactionInfo createTransactionInfo(TransactionId txid) {
-        if( txid ==null ) {
+        if (txid == null) {
             return null;
         }
         KahaTransactionInfo rc = new KahaTransactionInfo();
-        
+
         // Link it up to the previous record that was part of the transaction.
         ArrayList<Operation> tx = inflightTransactions.get(txid);
-        if( tx!=null ) {
-            rc.setPreviousEntry(convert(tx.get(tx.size()-1).location));
+        if (tx != null) {
+            rc.setPreviousEntry(convert(tx.get(tx.size() - 1).location));
         }
-        
-        if( txid.isLocalTransaction() ) {
-            LocalTransactionId t = (LocalTransactionId)txid;
+
+        if (txid.isLocalTransaction()) {
+            LocalTransactionId t = (LocalTransactionId) txid;
             KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
             kahaTxId.setConnectionId(t.getConnectionId().getValue());
             kahaTxId.setTransacitonId(t.getValue());
             rc.setLocalTransacitonId(kahaTxId);
         } else {
-            XATransactionId t = (XATransactionId)txid;
+            XATransactionId t = (XATransactionId) txid;
             KahaXATransactionId kahaTxId = new KahaXATransactionId();
             kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
             kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
@@ -599,18 +836,18 @@ public class KahaDBStore extends Message
         }
         return rc;
     }
-    
+
     KahaLocation convert(Location location) {
         KahaLocation rc = new KahaLocation();
         rc.setLogId(location.getDataFileId());
         rc.setOffset(location.getOffset());
         return rc;
     }
-    
+
     KahaDestination convert(ActiveMQDestination dest) {
         KahaDestination rc = new KahaDestination();
         rc.setName(dest.getPhysicalName());
-        switch( dest.getDestinationType() ) {
+        switch (dest.getDestinationType()) {
         case ActiveMQDestination.QUEUE_TYPE:
             rc.setType(DestinationType.QUEUE);
             return rc;
@@ -630,13 +867,13 @@ public class KahaDBStore extends Message
 
     ActiveMQDestination convert(String dest) {
         int p = dest.indexOf(":");
-        if( p<0 ) {
+        if (p < 0) {
             throw new IllegalArgumentException("Not in the valid destination format");
         }
         int type = Integer.parseInt(dest.substring(0, p));
-        String name = dest.substring(p+1);
-        
-        switch( KahaDestination.DestinationType.valueOf(type) ) {
+        String name = dest.substring(p + 1);
+
+        switch (KahaDestination.DestinationType.valueOf(type)) {
         case QUEUE:
             return new ActiveMQQueue(name);
         case TOPIC:
@@ -645,9 +882,113 @@ public class KahaDBStore extends Message
             return new ActiveMQTempQueue(name);
         case TEMP_TOPIC:
             return new ActiveMQTempTopic(name);
-        default:    
+        default:
             throw new IllegalArgumentException("Not in the valid destination format");
         }
     }
-        
+
+    class StoreQueueTask implements Runnable {
+        protected final Message message;
+        protected final ConnectionContext context;
+        protected final MessageStore store;
+        protected final InnerFutureTask future;
+        protected final AtomicBoolean done = new AtomicBoolean();
+
+        public StoreQueueTask(MessageStore store, ConnectionContext context, Message message) {
+            this.store = store;
+            this.context = context;
+            this.message = message;
+            this.future = new InnerFutureTask(this);
+
+        }
+
+        public Future<Object> getFuture() {
+            return this.future;
+        }
+
+        public boolean cancel() {
+            if (this.done.compareAndSet(false, true)) {
+                this.future.cancel(false);
+                return true;
+            }
+            return false;
+        }
+
+        public void run() {
+            try {
+                if (this.done.compareAndSet(false, true)) {
+                    this.store.addMessage(context, message);
+                    removeQueueTask(this.message.getMessageId());
+                    this.future.complete();
+                }
+            } catch (Exception e) {
+                this.future.setException(e);
+            }
+        }
+
+        protected Message getMessage() {
+            return this.message;
+        }
+
+        private class InnerFutureTask extends FutureTask<Object> {
+
+            public InnerFutureTask(Runnable runnable) {
+                super(runnable, null);
+
+            }
+
+            public void setException(final Exception e) {
+                super.setException(e);
+            }
+
+            public void complete() {
+                super.set(null);
+            }
+        }
+    }
+
+    class StoreTopicTask extends StoreQueueTask {
+        private final int subscriptionCount;
+        private final List<String> subscriptionKeys = new ArrayList<String>(1);
+        private final KahaDBTopicMessageStore topicStore;
+        public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
+                int subscriptionCount) {
+            super(store, context, message);
+            this.topicStore = store;
+            this.subscriptionCount = subscriptionCount;
+
+        }
+
+        /**
+         * add a key
+         * 
+         * @param key
+         * @return true if all acknowledgements received
+         */
+        public boolean addSubscriptionKey(String key) {
+            synchronized (this.subscriptionKeys) {
+                this.subscriptionKeys.add(key);
+            }
+            return this.subscriptionKeys.size() >= this.subscriptionCount;
+        }
+
+        @Override
+        public void run() {
+            try {
+                if (this.done.compareAndSet(false, true)) {
+                    this.topicStore.addMessage(context, message);
+                    // apply any acks we have
+                    synchronized (this.subscriptionKeys) {
+                        for (String key : this.subscriptionKeys) {
+                            this.topicStore.doAcknowledge(context, key, this.message.getMessageId());
+                        }
+                    }
+                    removeQueueTask(this.message.getMessageId());
+                    this.future.complete();
+                }
+            } catch (Exception e) {
+                this.future.setException(e);
+            }
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Mon May 17 11:53:28 2010
@@ -57,6 +57,8 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
 import org.apache.activemq.util.Callback;
 import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.index.BTreeIndex;
@@ -78,7 +80,7 @@ import org.apache.kahadb.util.SequenceSe
 import org.apache.kahadb.util.StringMarshaller;
 import org.apache.kahadb.util.VariableMarshaller;
 
-public class MessageDatabase implements BrokerServiceAware {
+public class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
 	
 	private BrokerService brokerService;
 
@@ -171,7 +173,7 @@ public class MessageDatabase implements 
     protected AtomicBoolean opened = new AtomicBoolean();
     private LockFile lockFile;
     private boolean ignoreMissingJournalfiles = false;
-    private int indexCacheSize = 100;
+    private int indexCacheSize = 10000;
     private boolean checkForCorruptJournalFiles = false;
     private boolean checksumJournalFiles = false;
     
@@ -179,16 +181,14 @@ public class MessageDatabase implements 
     public MessageDatabase() {
     }
 
-    public void start() throws Exception {
-        if (started.compareAndSet(false, true)) {
-        	load();
-        }
+    @Override
+    public void doStart() throws Exception {
+        load();
     }
 
-    public void stop() throws Exception {
-        if (started.compareAndSet(true, false)) {
-            unload();
-        }
+    @Override
+    public void doStop(ServiceStopper stopper) throws Exception {
+        unload();
     }
 
 	private void loadPageFile() throws IOException {



Mime
View raw message