activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r692336 - in /activemq/sandbox/kahadb/src/main: java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java java/org/apache/kahadb/store/MessageDatabase.java proto/journal-data.proto
Date Fri, 05 Sep 2008 04:55:30 GMT
Author: chirino
Date: Thu Sep  4 21:55:29 2008
New Revision: 692336

URL: http://svn.apache.org/viewvc?rev=692336&view=rev
Log:
Got XA recovery working

Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
    activemq/sandbox/kahadb/src/main/proto/journal-data.proto

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java?rev=692336&r1=692335&r2=692336&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
Thu Sep  4 21:55:29 2008
@@ -24,6 +24,7 @@
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.Map.Entry;
@@ -54,6 +55,8 @@
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.store.MessageDatabase.AddOpperation;
+import org.apache.kahadb.store.MessageDatabase.Operation;
 import org.apache.kahadb.store.MessageDatabase.StoredDestination;
 import org.apache.kahadb.store.data.KahaAddMessageCommand;
 import org.apache.kahadb.store.data.KahaCommitCommand;
@@ -96,17 +99,41 @@
 
     public TransactionStore createTransactionStore() throws IOException {
         return new TransactionStore(){
+            
             public void commit(TransactionId txid, boolean wasPrepared) throws IOException
{
                 store(new KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)),
true);
             }
             public void prepare(TransactionId txid) throws IOException {
                 store(new KahaPrepareCommand().setTransactionInfo(createTransactionInfo(txid)),
true);
             }
-            public void recover(TransactionRecoveryListener listener) throws IOException
{
-            }
             public void rollback(TransactionId txid) throws IOException {
                 store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(txid)),
false);
             }
+            public void recover(TransactionRecoveryListener listener) throws IOException
{
+                for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet())
{
+                    XATransactionId xid = (XATransactionId)entry.getKey();
+                    ArrayList<Message> messageList = new ArrayList<Message>();
+                    ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
+                    
+                    for (Operation op : entry.getValue()) {
+                        if( op.getClass() == AddOpperation.class ) {
+                            AddOpperation addOp = (AddOpperation)op;
+                            Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput())
);
+                            messageList.add(msg);
+                        } else {
+                            RemoveOpperation rmOp = (RemoveOpperation)op;
+                            MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput())
);
+                            ackList.add(ack);
+                        }
+                    }
+                    
+                    Message[] addedMessages = new Message[messageList.size()];
+                    MessageAck[] acks = new MessageAck[ackList.size()];
+                    messageList.toArray(addedMessages);
+                    ackList.toArray(acks);
+                    listener.recover(xid, addedMessages, acks);
+                }
+            }
             public void start() throws Exception {
             }
             public void stop() throws Exception {

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=692336&r1=692335&r2=692336&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Thu
Sep  4 21:55:29 2008
@@ -535,16 +535,15 @@
         }
     }
 
-    class Operation {
+    abstract class Operation {
         final Location location;
         public Operation(Location location) {
             this.location = location;
         }
-        public void execute(Transaction tx) throws IOException {
-        }
+        abstract public void execute(Transaction tx) throws IOException;
     }
     
-    private class AddOpperation extends Operation {
+    class AddOpperation extends Operation {
         final KahaAddMessageCommand command;
         
         public AddOpperation(KahaAddMessageCommand command, Location location) {
@@ -555,9 +554,13 @@
         public void execute(Transaction tx) throws IOException {
             upadateIndex(tx, command, location);
         }
+        
+        public KahaAddMessageCommand getCommand() {
+            return command;
+        }
     }
     
-    private class RemoveOpperation extends Operation {
+    class RemoveOpperation extends Operation {
         final KahaRemoveMessageCommand command;
         
         public RemoveOpperation(KahaRemoveMessageCommand command, Location location) {
@@ -568,6 +571,10 @@
         public void execute(Transaction tx) throws IOException {
             updateIndex(tx, command, location);
         }
+
+        public KahaRemoveMessageCommand getCommand() {
+            return command;
+        }
     }
     
     

Modified: activemq/sandbox/kahadb/src/main/proto/journal-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/proto/journal-data.proto?rev=692336&r1=692335&r2=692336&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/proto/journal-data.proto (original)
+++ activemq/sandbox/kahadb/src/main/proto/journal-data.proto Thu Sep  4 21:55:29 2008
@@ -61,6 +61,7 @@
   optional KahaTransactionInfo transaction_info=1;
   required KahaDestination destination = 2;
   required string messageId = 3;
+  required bytes ack = 4;
 }
 
 message KahaPrepareCommand {



Mime
View raw message