activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r685966 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store: jdbc/JDBCPersistenceAdapter.java memory/MemoryPersistenceAdapter.java memory/MemoryTransactionStore.java
Date Thu, 14 Aug 2008 17:22:16 GMT
Author: chirino
Date: Thu Aug 14 10:22:15 2008
New Revision: 685966

URL: http://svn.apache.org/viewvc?rev=685966&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1886

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=685966&r1=685965&r2=685966&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
Thu Aug 14 10:22:15 2008
@@ -134,7 +134,7 @@
 
     public TransactionStore createTransactionStore() throws IOException {
         if (transactionStore == null) {
-            transactionStore = new MemoryTransactionStore();
+            transactionStore = new MemoryTransactionStore(this);
         }
         return this.transactionStore;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java?rev=685966&r1=685965&r2=685966&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
Thu Aug 14 10:22:15 2008
@@ -89,7 +89,7 @@
 
     public TransactionStore createTransactionStore() throws IOException {
         if (transactionStore == null) {
-            transactionStore = new MemoryTransactionStore();
+            transactionStore = new MemoryTransactionStore(this);
         }
         return transactionStore;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?rev=685966&r1=685965&r2=685966&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
Thu Aug 14 10:22:15 2008
@@ -29,6 +29,7 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.ProxyMessageStore;
 import org.apache.activemq.store.ProxyTopicMessageStore;
 import org.apache.activemq.store.TopicMessageStore;
@@ -44,12 +45,12 @@
 public class MemoryTransactionStore implements TransactionStore {
 
     ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object,
Tx>();
-
     ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId,
Tx>();
+    final PersistenceAdapter persistenceAdapter;
 
     private boolean doingRecover;
 
-    public static class Tx {
+    public class Tx {
         private ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
 
         private ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
@@ -86,29 +87,43 @@
          * @throws IOException
          */
         public void commit() throws IOException {
-            // Do all the message adds.
-            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();)
{
-                AddMessageCommand cmd = iter.next();
-                cmd.run();
-            }
-            // And removes..
-            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();)
{
-                RemoveMessageCommand cmd = iter.next();
-                cmd.run();
+            ConnectionContext ctx = new ConnectionContext();
+            persistenceAdapter.beginTransaction(ctx);
+            try {
+                
+                // Do all the message adds.
+                for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();)
{
+                    AddMessageCommand cmd = iter.next();
+                    cmd.run(ctx);
+                }
+                // And removes..
+                for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();)
{
+                    RemoveMessageCommand cmd = iter.next();
+                    cmd.run(ctx);
+                }
+                
+            } catch ( IOException e ) {
+                persistenceAdapter.rollbackTransaction(ctx);
+                throw e;
             }
+            persistenceAdapter.commitTransaction(ctx);
         }
     }
-
+    
     public interface AddMessageCommand {
         Message getMessage();
 
-        void run() throws IOException;
+        void run(ConnectionContext context) throws IOException;
     }
 
     public interface RemoveMessageCommand {
         MessageAck getMessageAck();
 
-        void run() throws IOException;
+        void run(ConnectionContext context) throws IOException;
+    }
+    
+    public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
+        this.persistenceAdapter=persistenceAdapter;
     }
 
     public MessageStore proxy(MessageStore messageStore) {
@@ -221,15 +236,16 @@
                     return message;
                 }
 
-                public void run() throws IOException {
-                    destination.addMessage(null, message);
+                public void run(ConnectionContext ctx) throws IOException {
+                    destination.addMessage(ctx, message);
                 }
+
             });
         } else {
             destination.addMessage(null, message);
         }
     }
-
+    
     /**
      * @param ack
      * @throws IOException
@@ -246,8 +262,8 @@
                     return ack;
                 }
 
-                public void run() throws IOException {
-                    destination.removeMessage(null, ack);
+                public void run(ConnectionContext ctx) throws IOException {
+                    destination.removeMessage(ctx, ack);
                 }
             });
         } else {



Mime
View raw message