activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1296469 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store: ./ kahadaptor/ kahadb/ memory/
Date Fri, 02 Mar 2012 21:11:30 GMT
Author: tabish
Date: Fri Mar  2 21:11:30 2012
New Revision: 1296469

URL: http://svn.apache.org/viewvc?rev=1296469&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3750

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=1296469&r1=1296468&r2=1296469&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
Fri Mar  2 21:11:30 2012
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -35,75 +36,87 @@ abstract public class AbstractMessageSto
     public AbstractMessageStore(ActiveMQDestination destination) {
         this.destination = destination;
     }
-    
+
+    @Override
     public void dispose(ConnectionContext context) {
     }
 
+    @Override
     public void start() throws Exception {
     }
 
+    @Override
     public void stop() throws Exception {
     }
 
+    @Override
     public ActiveMQDestination getDestination() {
         return destination;
     }
 
+    @Override
     public void setMemoryUsage(MemoryUsage memoryUsage) {
     }
 
+    @Override
     public void setBatch(MessageId messageId) throws IOException, Exception {
     }
 
     /**
      * flag to indicate if the store is empty
-     * 
+     *
      * @return true if the message count is 0
      * @throws Exception
      */
+    @Override
     public boolean isEmpty() throws Exception {
         return getMessageCount() == 0;
     }
-    
+
+    @Override
     public void setPrioritizedMessages(boolean prioritizedMessages) {
         this.prioritizedMessages = prioritizedMessages;
-    }    
+    }
 
+    @Override
     public boolean isPrioritizedMessages() {
         return this.prioritizedMessages;
     }
 
-
-    public void addMessage(final ConnectionContext context, final Message message,final boolean
canOptimizeHint) throws IOException{
-        addMessage(context,message);
+    @Override
+    public void addMessage(final ConnectionContext context, final Message message, final
boolean canOptimizeHint) throws IOException{
+        addMessage(context, message);
     }
 
-
+    @Override
     public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final
Message message) throws IOException {
         addMessage(context, message);
         return FUTURE;
     }
 
+    @Override
     public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final
Message message,final boolean canOptimizeHint) throws IOException {
-        addMessage(context, message,canOptimizeHint);
+        addMessage(context, message, canOptimizeHint);
         return FUTURE;
     }
 
-       
+    @Override
     public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final
Message message,final boolean canOptimizeHint) throws IOException {
-        addMessage(context, message,canOptimizeHint);
+        addMessage(context, message, canOptimizeHint);
         return FUTURE;
     }
 
+    @Override
     public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final
Message message) throws IOException {
         addMessage(context, message);
         return FUTURE;
     }
 
+    @Override
     public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException
{
         removeMessage(context, ack);
     }
-    
+
     static class CallableImplementation implements Callable<Object> {
         public Object call() throws Exception {
             return null;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=1296469&r1=1296468&r2=1296469&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
Fri Mar  2 21:11:30 2012
@@ -18,6 +18,7 @@ package org.apache.activemq.store;
 
 import java.io.IOException;
 import java.util.concurrent.Future;
+
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -28,14 +29,14 @@ import org.apache.activemq.usage.MemoryU
 
 /**
  * Represents a message store which is used by the persistent implementations
- * 
- * 
+ *
+ *
  */
 public interface MessageStore extends Service {
 
     /**
      * Adds a message to the message store
-     * 
+     *
      * @param context context
      * @param message
      * @throws IOException
@@ -50,15 +51,15 @@ public interface MessageStore extends Se
      * @param canOptimizeHint - give a hint to the store that the message may be consumed
before it hits the disk
      * @throws IOException
      */
-    void addMessage(ConnectionContext context, Message message,boolean canOptimizeHint) throws
IOException;
-    
+    void addMessage(ConnectionContext context, Message message, boolean canOptimizeHint)
throws IOException;
+
     /**
      * Adds a message to the message store
-     * 
+     *
      * @param context context
      * @param message
      * @return a Future to track when this is complete
-     * @throws IOException 
+     * @throws IOException
      * @throws IOException
      */
     Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message)
throws IOException;
@@ -73,20 +74,20 @@ public interface MessageStore extends Se
      * @throws IOException
      * @throws IOException
      */
-    Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message,boolean
canOptimizeHint) throws IOException;
-    
+    Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message,
boolean canOptimizeHint) throws IOException;
+
     /**
      * Adds a message to the message store
-     * 
+     *
      * @param context context
      * @param message
      * @return a Future to track when this is complete
-     * @throws IOException 
+     * @throws IOException
      * @throws IOException
      */
     Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message)
throws IOException;
 
-/**
+    /**
      * Adds a message to the message store
      *
      * @param context context
@@ -96,14 +97,13 @@ public interface MessageStore extends Se
      * @throws IOException
      * @throws IOException
      */
-    Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message,boolean
canOptimizeHint) throws IOException;
-
+    Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message,
boolean canOptimizeHint) throws IOException;
 
     /**
      * Looks up a message using either the String messageID or the
      * messageNumber. Implementations are encouraged to fill in the missing key
      * if its easy to do so.
-     * 
+     *
      * @param identity which contains either the messageID or the messageNumber
      * @return the message or null if it does not exist
      * @throws IOException
@@ -112,7 +112,7 @@ public interface MessageStore extends Se
 
     /**
      * Removes a message from the message store.
-     * 
+     *
      * @param context
      * @param ack the ack request that cause the message to be removed. It
      *                conatins the identity which contains the messageID of the
@@ -120,12 +120,12 @@ public interface MessageStore extends Se
      * @throws IOException
      */
     void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
-    
+
     void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException;
 
     /**
      * Removes all the messages from the message store.
-     * 
+     *
      * @param context
      * @throws IOException
      */
@@ -133,7 +133,7 @@ public interface MessageStore extends Se
 
     /**
      * Recover any messages to be delivered.
-     * 
+     *
      * @param container
      * @throws Exception
      */
@@ -141,7 +141,7 @@ public interface MessageStore extends Se
 
     /**
      * The destination that the message store is holding messages for.
-     * 
+     *
      * @return the destination
      */
     ActiveMQDestination getDestination();
@@ -155,13 +155,13 @@ public interface MessageStore extends Se
     /**
      * @return the number of messages ready to deliver
      * @throws IOException
-     * 
+     *
      */
     int getMessageCount() throws IOException;
 
     /**
      * A hint to the Store to reset any batching state for the Destination
-     * 
+     *
      */
     void resetBatching();
 
@@ -172,27 +172,27 @@ public interface MessageStore extends Se
     /**
      * allow caching cursors to set the current batch offset when cache is exhausted
      * @param messageId
-     * @throws Exception 
+     * @throws Exception
      */
     void setBatch(MessageId messageId) throws Exception;
-    
+
     /**
      * flag to indicate if the store is empty
      * @return true if the message count is 0
-     * @throws Exception 
+     * @throws Exception
      */
     boolean isEmpty() throws Exception;
-    
+
     /**
      * A hint to the store to try recover messages according to priority
      * @param prioritizedMessages
      */
     public void setPrioritizedMessages(boolean prioritizedMessages);
-    
+
     /**
-     * 
+     *
      * @return true if store is trying to recover messages according to priority
      */
     public boolean isPrioritizedMessages();
-    
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=1296469&r1=1296468&r2=1296469&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
Fri Mar  2 21:11:30 2012
@@ -18,6 +18,7 @@ package org.apache.activemq.store;
 
 import java.io.IOException;
 import java.util.concurrent.Future;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -40,96 +41,117 @@ public class ProxyMessageStore implement
         return delegate;
     }
 
+    @Override
     public void addMessage(ConnectionContext context, Message message) throws IOException
{
         delegate.addMessage(context, message);
     }
 
+    @Override
     public void addMessage(ConnectionContext context, Message message, boolean canOptimizeHint)
throws IOException {
         delegate.addMessage(context,message,canOptimizeHint);
     }
 
+    @Override
     public Message getMessage(MessageId identity) throws IOException {
         return delegate.getMessage(identity);
     }
 
+    @Override
     public void recover(MessageRecoveryListener listener) throws Exception {
         delegate.recover(listener);
     }
 
+    @Override
     public void removeAllMessages(ConnectionContext context) throws IOException {
         delegate.removeAllMessages(context);
     }
 
+    @Override
     public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException
{
         delegate.removeMessage(context, ack);
     }
 
+    @Override
     public void start() throws Exception {
         delegate.start();
     }
 
+    @Override
     public void stop() throws Exception {
         delegate.stop();
     }
 
+    @Override
     public void dispose(ConnectionContext context) {
         delegate.dispose(context);
     }
 
+    @Override
     public ActiveMQDestination getDestination() {
         return delegate.getDestination();
     }
 
+    @Override
     public void setMemoryUsage(MemoryUsage memoryUsage) {
         delegate.setMemoryUsage(memoryUsage);
     }
 
+    @Override
     public int getMessageCount() throws IOException {
         return delegate.getMessageCount();
     }
 
+    @Override
     public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws
Exception {
         delegate.recoverNextMessages(maxReturned, listener);
-
     }
 
+    @Override
     public void resetBatching() {
         delegate.resetBatching();
-
     }
 
+    @Override
     public void setBatch(MessageId messageId) throws Exception {
         delegate.setBatch(messageId);
     }
 
+    @Override
     public boolean isEmpty() throws Exception {
        return delegate.isEmpty();
     }
 
+    @Override
     public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message)
throws IOException {
        return delegate.asyncAddQueueMessage(context, message);
     }
 
+    @Override
     public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message,
boolean canOptimizeHint) throws IOException {
        return delegate.asyncAddQueueMessage(context,message,canOptimizeHint);
     }
 
+    @Override
     public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message)
throws IOException {
         return delegate.asyncAddTopicMessage(context, message);
      }
 
+    @Override
     public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message,
boolean canOptimizeHint) throws IOException {
         return asyncAddTopicMessage(context,message,canOptimizeHint);
     }
 
+    @Override
     public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException
{
-        delegate.removeAsyncMessage(context, ack);       
+        delegate.removeAsyncMessage(context, ack);
     }
 
+    @Override
     public void setPrioritizedMessages(boolean prioritizedMessages) {
         delegate.setPrioritizedMessages(prioritizedMessages);
     }
 
+    @Override
     public boolean isPrioritizedMessages() {
         return delegate.isPrioritizedMessages();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=1296469&r1=1296468&r2=1296469&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
Fri Mar  2 21:11:30 2012
@@ -18,6 +18,7 @@ package org.apache.activemq.store;
 
 import java.io.IOException;
 import java.util.concurrent.Future;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -41,135 +42,165 @@ public class ProxyTopicMessageStore impl
         return delegate;
     }
 
+    @Override
     public void addMessage(ConnectionContext context, Message message) throws IOException
{
         delegate.addMessage(context, message);
     }
 
+    @Override
     public void addMessage(ConnectionContext context, Message message, boolean canOptimizeHint)
throws IOException {
-       delegate.addMessage(context,message,canOptimizeHint);
+       delegate.addMessage(context, message, canOptimizeHint);
     }
 
+    @Override
     public Message getMessage(MessageId identity) throws IOException {
         return delegate.getMessage(identity);
     }
 
+    @Override
     public void recover(MessageRecoveryListener listener) throws Exception {
         delegate.recover(listener);
     }
 
+    @Override
     public void removeAllMessages(ConnectionContext context) throws IOException {
         delegate.removeAllMessages(context);
     }
 
+    @Override
     public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException
{
         delegate.removeMessage(context, ack);
     }
 
+    @Override
     public void start() throws Exception {
         delegate.start();
     }
 
+    @Override
     public void stop() throws Exception {
         delegate.stop();
     }
 
+    @Override
     public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName)
throws IOException {
         return delegate.lookupSubscription(clientId, subscriptionName);
     }
 
+    @Override
     public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
                             MessageId messageId, MessageAck ack) throws IOException {
         delegate.acknowledge(context, clientId, subscriptionName, messageId, ack);
     }
 
+    @Override
     public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws
IOException {
         delegate.addSubsciption(subscriptionInfo, retroactive);
     }
 
+    @Override
     public void deleteSubscription(String clientId, String subscriptionName) throws IOException
{
         delegate.deleteSubscription(clientId, subscriptionName);
     }
 
+    @Override
     public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener
listener)
         throws Exception {
         delegate.recoverSubscription(clientId, subscriptionName, listener);
     }
 
+    @Override
     public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
                                     MessageRecoveryListener listener) throws Exception {
         delegate.recoverNextMessages(clientId, subscriptionName, maxReturned, listener);
     }
 
+    @Override
     public void resetBatching(String clientId, String subscriptionName) {
         delegate.resetBatching(clientId, subscriptionName);
     }
 
+    @Override
     public ActiveMQDestination getDestination() {
         return delegate.getDestination();
     }
 
+    @Override
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
         return delegate.getAllSubscriptions();
     }
 
+    @Override
     public void setMemoryUsage(MemoryUsage memoryUsage) {
         delegate.setMemoryUsage(memoryUsage);
     }
 
+    @Override
     public int getMessageCount(String clientId, String subscriberName) throws IOException
{
         return delegate.getMessageCount(clientId, subscriberName);
     }
 
+    @Override
     public int getMessageCount() throws IOException {
         return delegate.getMessageCount();
     }
 
+    @Override
     public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws
Exception {
         delegate.recoverNextMessages(maxReturned, listener);
-
     }
 
+    @Override
     public void dispose(ConnectionContext context) {
         delegate.dispose(context);
     }
 
+    @Override
     public void resetBatching() {
         delegate.resetBatching();
-
     }
 
+    @Override
     public void setBatch(MessageId messageId) throws Exception {
         delegate.setBatch(messageId);
     }
-    
+
+    @Override
     public boolean isEmpty() throws Exception {
         return delegate.isEmpty();
      }
 
+    @Override
     public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message)
throws IOException {
         return delegate.asyncAddTopicMessage(context, message);
      }
 
+    @Override
     public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message,
boolean canOptimizeHint) throws IOException {
-        return delegate.asyncAddTopicMessage(context,message,canOptimizeHint);
+        return delegate.asyncAddTopicMessage(context,message, canOptimizeHint);
     }
 
+    @Override
     public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message)
throws IOException {
         return delegate.asyncAddQueueMessage(context, message);
     }
 
+    @Override
     public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message,
boolean canOptimizeHint) throws IOException {
-        return delegate.asyncAddQueueMessage(context,message,canOptimizeHint);
+        return delegate.asyncAddQueueMessage(context,message, canOptimizeHint);
     }
 
+    @Override
     public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException
{
         delegate.removeAsyncMessage(context, ack);
     }
 
+    @Override
     public void setPrioritizedMessages(boolean prioritizedMessages) {
         delegate.setPrioritizedMessages(prioritizedMessages);
     }
-    
+
+    @Override
     public boolean isPrioritizedMessages() {
         return delegate.isPrioritizedMessages();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java?rev=1296469&r1=1296468&r2=1296469&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
Fri Mar  2 21:11:30 2012
@@ -21,7 +21,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import javax.transaction.xa.XAException;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
@@ -43,16 +43,16 @@ import org.slf4j.LoggerFactory;
 /**
  * Provides a TransactionStore implementation that can create transaction aware
  * MessageStore objects from non transaction aware MessageStore objects.
- * 
- * 
+ *
+ *
  */
-public class KahaTransactionStore implements TransactionStore, BrokerServiceAware {	
+public class KahaTransactionStore implements TransactionStore, BrokerServiceAware {
     private static final Logger LOG = LoggerFactory.getLogger(KahaTransactionStore.class);
-	
+
     private final Map transactions = new ConcurrentHashMap();
     private final Map prepared;
     private final KahaPersistenceAdapter adaptor;
-    
+
     private BrokerService brokerService;
 
     KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) {
@@ -68,6 +68,11 @@ public class KahaTransactionStore implem
             }
 
             @Override
+            public void addMessage(ConnectionContext context, final Message send, boolean
canOptimize) throws IOException {
+                KahaTransactionStore.this.addMessage(getDelegate(), send);
+            }
+
+            @Override
             public void removeMessage(ConnectionContext context, final MessageAck ack) throws
IOException {
                 KahaTransactionStore.this.removeMessage(getDelegate(), ack);
             }
@@ -150,19 +155,19 @@ public class KahaTransactionStore implem
      * @throws IOException
      */
     void addMessage(final MessageStore destination, final Message message) throws IOException
{
-    	try {
-    		if (message.isInTransaction()) {
-    			KahaTransaction tx = getOrCreateTx(message.getTransactionId());
-    			tx.add((KahaMessageStore)destination, message);
-    		} else {
-    			destination.addMessage(null, message);
-    		}
-    	} catch (RuntimeStoreException rse) {
+        try {
+            if (message.isInTransaction()) {
+                KahaTransaction tx = getOrCreateTx(message.getTransactionId());
+                tx.add((KahaMessageStore)destination, message);
+            } else {
+                destination.addMessage(null, message);
+            }
+        } catch (RuntimeStoreException rse) {
             if (rse.getCause() instanceof IOException) {
                 brokerService.handleIOException((IOException)rse.getCause());
             }
             throw rse;
-    	}
+        }
     }
 
     /**
@@ -170,19 +175,19 @@ public class KahaTransactionStore implem
      * @throws IOException
      */
     final void removeMessage(final MessageStore destination, final MessageAck ack) throws
IOException {
-    	try {
-    		if (ack.isInTransaction()) {
-    			KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
-    			tx.add((KahaMessageStore)destination, ack);
-    		} else {
-    			destination.removeMessage(null, ack);
-    		}
-    	} catch (RuntimeStoreException rse) {
+        try {
+            if (ack.isInTransaction()) {
+                KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
+                tx.add((KahaMessageStore)destination, ack);
+            } else {
+                destination.removeMessage(null, ack);
+            }
+        } catch (RuntimeStoreException rse) {
             if (rse.getCause() instanceof IOException) {
                 brokerService.handleIOException((IOException)rse.getCause());
             }
             throw rse;
-    	}
+        }
     }
 
     final void acknowledge(final TopicMessageStore destination, String clientId,
@@ -233,7 +238,7 @@ public class KahaTransactionStore implem
         return adaptor.retrieveMessageStore(id);
     }
 
-	public void setBrokerService(BrokerService brokerService) {
-		this.brokerService = brokerService;
-	}
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=1296469&r1=1296468&r2=1296469&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
Fri Mar  2 21:11:30 2012
@@ -26,6 +26,7 @@ import java.util.concurrent.Cancellation
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -55,8 +56,8 @@ import org.slf4j.LoggerFactory;
 /**
  * Provides a TransactionStore implementation that can create transaction aware
  * MessageStore objects from non transaction aware MessageStore objects.
- * 
- * 
+ *
+ *
  */
 public class KahaDBTransactionStore implements TransactionStore {
     static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
@@ -119,7 +120,7 @@ public class KahaDBTransactionStore impl
                 cmd.run();
                 results.add(cmd.run());
             }
-            
+
             return results;
         }
     }
@@ -157,11 +158,21 @@ public class KahaDBTransactionStore impl
             }
 
             @Override
+            public void addMessage(ConnectionContext context, final Message send, boolean
canOptimize) throws IOException {
+                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
+            }
+
+            @Override
             public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message
message) throws IOException {
                 return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(),
message);
             }
 
             @Override
+            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message
message, boolean canOptimize) throws IOException {
+                return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(),
message);
+            }
+
+            @Override
             public void removeMessage(ConnectionContext context, final MessageAck ack) throws
IOException {
                 KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
             }
@@ -181,11 +192,21 @@ public class KahaDBTransactionStore impl
             }
 
             @Override
+            public void addMessage(ConnectionContext context, final Message send, boolean
canOptimize) throws IOException {
+                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
+            }
+
+            @Override
             public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message
message) throws IOException {
                 return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(),
message);
             }
 
             @Override
+            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message
message, boolean canOptimize) throws IOException {
+                return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(),
message);
+            }
+
+            @Override
             public void removeMessage(ConnectionContext context, final MessageAck ack) throws
IOException {
                 KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java?rev=1296469&r1=1296468&r2=1296469&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
Fri Mar  2 21:11:30 2012
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.Message;
@@ -71,11 +72,21 @@ public class MultiKahaDBTransactionStore
             }
 
             @Override
+            public void addMessage(ConnectionContext context, final Message send, boolean
canOptimizeHint) throws IOException {
+                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(),
send);
+            }
+
+            @Override
             public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message
message) throws IOException {
                 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore,
context, getDelegate(), message);
             }
 
             @Override
+            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message
message, boolean canOptimizeHint) throws IOException {
+                return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore,
context, getDelegate(), message);
+            }
+
+            @Override
             public void removeMessage(ConnectionContext context, final MessageAck ack) throws
IOException {
                 MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context,
getDelegate(), ack);
             }
@@ -90,11 +101,21 @@ public class MultiKahaDBTransactionStore
     public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore
messageStore) {
         return new ProxyTopicMessageStore(messageStore) {
             @Override
+            public void addMessage(ConnectionContext context, final Message send, boolean
canOptimizeHint) throws IOException {
+                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(),
send);
+            }
+
+            @Override
             public void addMessage(ConnectionContext context, final Message send) throws
IOException {
                 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(),
send);
             }
 
             @Override
+            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message
message, boolean canOptimizeHint) throws IOException {
+                return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore,
context, getDelegate(), message);
+            }
+
+            @Override
             public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message
message) throws IOException {
                 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore,
context, getDelegate(), message);
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?rev=1296469&r1=1296468&r2=1296469&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
Fri Mar  2 21:11:30 2012
@@ -21,7 +21,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
-import javax.transaction.xa.XAException;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -40,8 +40,8 @@ import org.apache.activemq.store.Transac
 /**
  * Provides a TransactionStore implementation that can create transaction aware
  * MessageStore objects from non transaction aware MessageStore objects.
- * 
- * 
+ *
+ *
  */
 public class MemoryTransactionStore implements TransactionStore {
 
@@ -91,7 +91,7 @@ public class MemoryTransactionStore impl
             ConnectionContext ctx = new ConnectionContext();
             persistenceAdapter.beginTransaction(ctx);
             try {
-                
+
                 // Do all the message adds.
                 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();)
{
                     AddMessageCommand cmd = iter.next();
@@ -102,7 +102,7 @@ public class MemoryTransactionStore impl
                     RemoveMessageCommand cmd = iter.next();
                     cmd.run(ctx);
                 }
-                
+
             } catch ( IOException e ) {
                 persistenceAdapter.rollbackTransaction(ctx);
                 throw e;
@@ -110,7 +110,7 @@ public class MemoryTransactionStore impl
             persistenceAdapter.commitTransaction(ctx);
         }
     }
-    
+
     public interface AddMessageCommand {
         Message getMessage();
 
@@ -122,7 +122,7 @@ public class MemoryTransactionStore impl
 
         void run(ConnectionContext context) throws IOException;
     }
-    
+
     public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
         this.persistenceAdapter=persistenceAdapter;
     }
@@ -135,19 +135,30 @@ public class MemoryTransactionStore impl
             }
 
             @Override
+            public void addMessage(ConnectionContext context, final Message send, boolean
canOptimize) throws IOException {
+                MemoryTransactionStore.this.addMessage(getDelegate(), send);
+            }
+
+            @Override
             public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message
message) throws IOException {
                 MemoryTransactionStore.this.addMessage(getDelegate(), message);
                 return AbstractMessageStore.FUTURE;
              }
-             
+
+            @Override
+            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message
message, boolean canoptimize) throws IOException {
+                MemoryTransactionStore.this.addMessage(getDelegate(), message);
+                return AbstractMessageStore.FUTURE;
+             }
+
             @Override
             public void removeMessage(ConnectionContext context, final MessageAck ack) throws
IOException {
                 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
             }
-             
+
             @Override
             public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws
IOException {
-                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);       
+                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
             }
         };
     }
@@ -160,19 +171,30 @@ public class MemoryTransactionStore impl
             }
 
             @Override
+            public void addMessage(ConnectionContext context, final Message send, boolean
canOptimize) throws IOException {
+                MemoryTransactionStore.this.addMessage(getDelegate(), send);
+            }
+
+            @Override
             public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message
message) throws IOException {
                 MemoryTransactionStore.this.addMessage(getDelegate(), message);
                 return AbstractMessageStore.FUTURE;
              }
 
             @Override
+            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message
message, boolean canOptimize) throws IOException {
+                MemoryTransactionStore.this.addMessage(getDelegate(), message);
+                return AbstractMessageStore.FUTURE;
+             }
+
+            @Override
             public void removeMessage(ConnectionContext context, final MessageAck ack) throws
IOException {
                 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
             }
-            
+
             @Override
             public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws
IOException {
-                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);       
+                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
             }
 
             @Override
@@ -285,7 +307,7 @@ public class MemoryTransactionStore impl
             destination.addMessage(null, message);
         }
     }
-    
+
     /**
      * @param ack
      * @throws IOException



Mime
View raw message