activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r692231 - in /activemq/sandbox/kahadb: ./ src/main/java/org/apache/kahadb/page/ src/main/java/org/apache/kahadb/store/ src/main/proto/ src/test/java/org/apache/kahadb/store/
Date Thu, 04 Sep 2008 20:42:28 GMT
Author: chirino
Date: Thu Sep  4 13:42:26 2008
New Revision: 692231

URL: http://svn.apache.org/viewvc?rev=692231&view=rev
Log:
- Cutting intital rough impl of a new ActiveMQ Message Store which uses the KahaDB to implement.
- Ehnahces that BTree index to be able to start an iterator at a given key
- Enhance the page file so that it can report the amount of disk space used.


Added:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/JournalCommand.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/Visitor.java
    activemq/sandbox/kahadb/src/main/proto/
    activemq/sandbox/kahadb/src/main/proto/journal-data.proto
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaLoadTester.java   (with props)
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreBrokerTest.java   (with props)
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreRecoveryBrokerTest.java   (with props)
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreXARecoveryBrokerTest.java   (with props)
Modified:
    activemq/sandbox/kahadb/pom.xml
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeIndex.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeNode.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java

Modified: activemq/sandbox/kahadb/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/pom.xml?rev=692231&r1=692230&r2=692231&view=diff
==============================================================================
--- activemq/sandbox/kahadb/pom.xml (original)
+++ activemq/sandbox/kahadb/pom.xml Thu Sep  4 13:42:26 2008
@@ -52,17 +52,55 @@
       </exclusions>
     </dependency>
     <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <version>3.8.1</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
       <version>1.2.14</version>
       <scope>test</scope>
     </dependency>
+    
+    <dependency>
+      <groupId>org.apache.activemq.protobuf</groupId>
+      <artifactId>activemq-protobuf</artifactId>
+      <version>1.0-SNAPSHOT</version>
+    </dependency>        
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-core</artifactId>
+      <version>5.2-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.xbean</groupId>
+      <artifactId>xbean-spring</artifactId>
+      <optional>true</optional>
+      <version>3.3</version>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-core</artifactId>
+      <version>2.5.5</version>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-beans</artifactId>
+      <version>2.5.5</version>
+      <optional>true</optional>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-core</artifactId>
+      <version>5.2-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>3.8.1</version>
+      <scope>test</scope>
+    </dependency>
+        
   </dependencies>
 
   <build>
@@ -75,6 +113,18 @@
           <target>1.5</target>
         </configuration>
       </plugin>
+      <plugin>
+        <groupId>org.apache.activemq.protobuf</groupId>
+        <artifactId>activemq-protobuf</artifactId>
+        <version>1.0-SNAPSHOT</version>
+         <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeIndex.java?rev=692231&r1=692230&r2=692231&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeIndex.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeIndex.java Thu Sep  4 13:42:26 2008
@@ -127,11 +127,14 @@
     private BTreeNode<Key,Value> root;
 
     public BTreeIndex(PageFile pageFile, long rootPageId) {
-        super();
         this.pageFile = pageFile;
         this.pageId = rootPageId;
     }
 
+    public BTreeIndex(PageFile pageFile, Page page) {
+        this(pageFile, page.getPageId());
+    }
+
     synchronized public void load() throws IOException {
         if (loaded.compareAndSet(false, true)) {
             LOG.debug("loading");
@@ -215,6 +218,10 @@
         return root.iterator(tx);
     }
     
+    synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, Key initialKey) throws IOException {
+        return root.iterator(tx, initialKey);
+    }
+
     synchronized Value getFirst(Transaction tx) throws IOException {
         return root.getFirst(tx);
     }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeNode.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeNode.java?rev=692231&r1=692230&r2=692231&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeNode.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeNode.java Thu Sep  4 13:42:26 2008
@@ -54,6 +54,78 @@
     // The next leaf node after this one.  Used for fast iteration of the entries.
     private long next = -1;
     
+    private final class BTreeIterator implements Iterator<Map.Entry<Key, Value>> {
+        private final Transaction tx;
+        BTreeNode<Key,Value> current;
+        int nextIndex;
+        Map.Entry<Key,Value> nextEntry;
+
+        private BTreeIterator(Transaction tx, BTreeNode<Key,Value> current, int nextIndex) {
+            this.tx = tx;
+            this.current = current;
+            this.nextIndex=nextIndex;
+        }
+
+        synchronized private void findNextPage() {
+            if( nextEntry!=null ) {
+                return;
+            }
+            
+            try {
+                while( current!=null ) {
+                    if( nextIndex >= current.keys.length ) {
+                        // we need to roll to the next leaf..
+                        if( current.next >= 0 ) {
+                            current = index.loadNode(tx, current.next, null);
+                            nextIndex=0;
+                        } else {
+                            break;
+                        }
+                    }  else {
+                        nextEntry = new Map.Entry<Key, Value>() {
+                            private final Key key = current.keys[nextIndex];
+                            private final Value value = current.values[nextIndex];
+                            
+                            public Key getKey() {
+                                return key;
+                            }
+                            public Value getValue() {
+                                return value;
+                            }
+                            public Value setValue(Value value) {
+                                throw new UnsupportedOperationException();
+                            }
+                        };
+                        nextIndex++;
+                        break;
+                    }
+                    
+                }
+            } catch (IOException e) {
+            }
+        }
+
+        public boolean hasNext() {
+            findNextPage();
+            return nextEntry !=null;
+        }
+
+        public Entry<Key, Value> next() {
+            findNextPage(); 
+            if( nextEntry !=null ) {
+                Entry<Key, Value> lastEntry = nextEntry;
+                nextEntry=null;
+                return lastEntry;
+            } else {
+                throw new NoSuchElementException();
+            }
+        }
+
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
     /**
      * The Marshaller is used to store and load the data in the BTreeNode into a Page.
      *  
@@ -419,8 +491,6 @@
             throw new IllegalArgumentException("Key cannot be null");
         }
         if( isBranch() ) {
-            
-            
             return getLeafNode(tx, this, key).get(tx, key);
         } else {
             int idx = Arrays.binarySearch(keys, key);
@@ -452,76 +522,23 @@
         return node;
     }
     
-    
-    public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx) throws IOException {
-        
-        return new Iterator<Map.Entry<Key,Value>>() {
-            
-            BTreeNode<Key,Value> current = getFirstLeafNode(tx);
-            int nextIndex;
-            
-            Map.Entry<Key,Value> nextEntry;
-            
-            synchronized private void findNextPage() {
-                if( nextEntry!=null ) {
-                    return;
-                }
-                
-                try {
-                    while( current!=null ) {
-                        if( nextIndex >= current.keys.length ) {
-                            // we need to roll to the next leaf..
-                            if( current.next >= 0 ) {
-                                current = index.loadNode(tx, current.next, null);
-                                nextIndex=0;
-                            } else {
-                                break;
-                            }
-                        }  else {
-                            nextEntry = new Map.Entry<Key, Value>() {
-                                private final Key key = current.keys[nextIndex];
-                                private final Value value = current.values[nextIndex];
-                                
-                                public Key getKey() {
-                                    return key;
-                                }
-                                public Value getValue() {
-                                    return value;
-                                }
-                                public Value setValue(Value value) {
-                                    throw new UnsupportedOperationException();
-                                }
-                            };
-                            nextIndex++;
-                            break;
-                        }
-                        
-                    }
-                } catch (IOException e) {
-                }
-            }
-
-            public boolean hasNext() {
-                findNextPage();
-                return nextEntry !=null;
-            }
-
-            public Entry<Key, Value> next() {
-                findNextPage(); 
-                if( nextEntry !=null ) {
-                    Entry<Key, Value> lastEntry = nextEntry;
-                    nextEntry=null;
-                    return lastEntry;
-                } else {
-                    throw new NoSuchElementException();
-                }
-            }
-            
-            public void remove() {
-                throw new UnsupportedOperationException();
+    public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, Key startKey) throws IOException {
+        if (startKey == null) {
+            return iterator(tx);
+        }
+        if( isBranch() ) {
+            return getLeafNode(tx, this, startKey).iterator(tx, startKey);
+        } else {
+            int idx = Arrays.binarySearch(keys, startKey);
+            if (idx < 0) {
+                idx = -(idx + 1);
             }
+            return new BTreeIterator(tx, this, idx);
+        }
+    }
 
-        };
+    public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx) throws IOException {
+        return new BTreeIterator(tx, getFirstLeafNode(tx), 0);
     }
     
     public void clear(Transaction tx) throws IOException {

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=692231&r1=692230&r2=692231&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Thu Sep  4 13:42:26 2008
@@ -1451,5 +1451,9 @@
     public void setEnableAsyncWrites(boolean enableAsyncWrites) {
         this.enableAsyncWrites = enableAsyncWrites;
     }
+
+    public long getDiskSize() throws IOException {
+        return readFile.length();
+    }
     
 }

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/JournalCommand.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/JournalCommand.java?rev=692231&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/JournalCommand.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/JournalCommand.java Thu Sep  4 13:42:26 2008
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store;
+
+import java.io.IOException;
+
+import org.apache.kahadb.store.data.KahaEntryType;
+
+public interface JournalCommand<T> extends org.apache.activemq.protobuf.Message<T> {
+
+    public void visit(Visitor visitor) throws IOException;
+
+    public KahaEntryType type();
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java?rev=692231&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java Thu Sep  4 13:42:26 2008
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store;
+
+import com.google.protobuf.ByteString;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.Map.Entry;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionRecoveryListener;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.kahadb.impl.async.Location;
+import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.store.MessageDatabase.StoredDestination;
+import org.apache.kahadb.store.data.KahaAddMessageCommand;
+import org.apache.kahadb.store.data.KahaCommitCommand;
+import org.apache.kahadb.store.data.KahaDestination;
+import org.apache.kahadb.store.data.KahaLocalTransactionId;
+import org.apache.kahadb.store.data.KahaLocation;
+import org.apache.kahadb.store.data.KahaPrepareCommand;
+import org.apache.kahadb.store.data.KahaRemoveDestinationCommand;
+import org.apache.kahadb.store.data.KahaRemoveMessageCommand;
+import org.apache.kahadb.store.data.KahaRollbackCommand;
+import org.apache.kahadb.store.data.KahaTransactionInfo;
+import org.apache.kahadb.store.data.KahaXATransactionId;
+import org.apache.kahadb.store.data.KahaDestination.DestinationType;
+
+public class KahaDBPersistenceAdaptor extends MessageDatabase implements PersistenceAdapter {
+
+    private String brokerName;
+    private SystemUsage usageManager;
+    private WireFormat wireFormat = new OpenWireFormat();
+    private AtomicBoolean started = new AtomicBoolean();
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+    
+    public void setUsageManager(SystemUsage usageManager) {
+        this.usageManager = usageManager;
+    }
+
+    public void start() throws Exception {
+        if ( started.compareAndSet(false,true) ) {
+            load();
+        }
+    }
+    public void stop() throws Exception {
+        if ( started.compareAndSet(true,false) ) {
+            unload();
+        }
+    }
+
+    public TransactionStore createTransactionStore() throws IOException {
+        return new TransactionStore(){
+            public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+                store(new KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)), true);
+            }
+            public void prepare(TransactionId txid) throws IOException {
+                store(new KahaPrepareCommand().setTransactionInfo(createTransactionInfo(txid)), true);
+            }
+            public void recover(TransactionRecoveryListener listener) throws IOException {
+            }
+            public void rollback(TransactionId txid) throws IOException {
+                store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(txid)), false);
+            }
+            public void start() throws Exception {
+            }
+            public void stop() throws Exception {
+            }
+        };
+    }
+
+    class KahaDBMessageStore implements MessageStore {
+        private final ActiveMQDestination destination;
+        private KahaDestination dest;
+
+        public KahaDBMessageStore(ActiveMQDestination destination) {
+            this.destination = destination;
+            this.dest = convert( destination );
+        }
+
+        public ActiveMQDestination getDestination() {
+            return destination;
+        }
+
+        public void addMessage(ConnectionContext context, Message message) throws IOException {
+            KahaAddMessageCommand command = new KahaAddMessageCommand();
+            command.setDestination(dest);
+            command.setMessageId(message.getMessageId().toString());
+            command.setTransactionInfo( createTransactionInfo(message.getTransactionId()) );
+
+            ByteSequence packet = wireFormat.marshal(message);
+            command.setMessage(ByteString.copyFrom(packet.getData(), packet.getOffset(), packet.getLength()));
+
+            store(command, message.isResponseRequired());
+        }
+        
+        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
+            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
+            command.setDestination(dest);
+            command.setMessageId(ack.getLastMessageId().toString());
+            command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()) );
+            store(command, ack.isResponseRequired());
+        }
+
+        public void removeAllMessages(ConnectionContext context) throws IOException {
+            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
+            command.setDestination(dest);
+            store(command, true);
+        }
+
+        public Message getMessage(MessageId identity) throws IOException {
+            final String key = identity.toString();
+            
+            // Hopefully one day the page file supports concurrent read operations... but for now we must
+            // externally synchronize...
+            Location location;
+            synchronized(indexMutex) {
+                location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>(){
+                    public Location execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        return sd.messageIdIndex.get(tx, key);
+                    }
+                });
+            }
+            if( location == null ) {
+                return null;
+            }
+            
+            return loadMessage(location);
+        }
+        
+        public int getMessageCount() throws IOException {
+            synchronized(indexMutex) {
+                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
+                    public Integer execute(Transaction tx) throws IOException {
+                        // Iterate through all index entries to get a count of messages in the destination.
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        int rc=0;
+                        for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
+                            iterator.next();
+                            rc++;
+                        }
+                        return rc;
+                    }
+                });
+            }
+        }
+
+        public void recover(final MessageRecoveryListener listener) throws Exception {
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<Exception>(){
+                    public void execute(Transaction tx) throws Exception {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
+                            Entry<Location, String> entry = iterator.next();
+                            listener.recoverMessage( loadMessage(entry.getKey() ) );
+                        }
+                    }
+                });
+            }
+        }
+
+        Location cursorPos=null;
+        
+        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<Exception>(){
+                    public void execute(Transaction tx) throws Exception {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        Entry<Location, String> entry=null;
+                        int counter = 0;
+                        for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                            entry = iterator.next();
+                            listener.recoverMessage( loadMessage(entry.getKey() ) );
+                            counter++;
+                            if( counter >= maxReturned ) {
+                                break;
+                            }
+                        }
+                        if( entry!=null ) {
+                            cursorPos = entry.getKey();
+                            cursorPos.setOffset( cursorPos.getOffset()+1 );
+                        }
+                    }
+                });
+            }
+        }
+
+        public void resetBatching() {
+            cursorPos=null;
+        }
+
+        public void setMemoryUsage(MemoryUsage memoeyUSage) {
+        }
+        public void start() throws Exception {
+        }
+        public void stop() throws Exception {
+        }
+        
+    }
+    
+    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
+        public KahaDBTopicMessageStore(ActiveMQTopic destination) {
+            super(destination);
+        }
+
+        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
+        }
+
+        public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
+        }
+
+        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+        }
+
+        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+            return null;
+        }
+
+        public int getMessageCount(String clientId, String subscriberName) throws IOException {
+            return 0;
+        }
+
+        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+            return null;
+        }
+
+        public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
+        }
+
+        public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
+        }
+
+        public void resetBatching(String clientId, String subscriptionName) {
+        }
+    }
+
+    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
+        return new KahaDBMessageStore(destination);
+    }
+
+    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
+        throw new IOException("Not yet implemented.");
+//        return new KahaDBTopicMessageStore(destination);
+    }
+    
+    public void deleteAllMessages() throws IOException {
+        deleteAllMessages=true;
+    }
+    
+    
+    public Set<ActiveMQDestination> getDestinations() {
+        try {
+            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<IOException>(){
+                    public void execute(Transaction tx) throws IOException {
+                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
+                            Entry<String, StoredDestination> entry = iterator.next();
+                            rc.add(convert(entry.getKey()));
+                        }
+                    }
+                });
+            }
+            return rc;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    public long getLastMessageBrokerSequenceId() throws IOException {
+        return 0;
+    }
+    
+    public long size() {
+        if ( !started.get() ) {
+            return 0;
+        }
+        try {
+            long diskSize; 
+            synchronized(indexMutex) {
+                diskSize = pageFile.getDiskSize();
+            }
+            return asyncDataManager.getDiskSize() + diskSize;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void beginTransaction(ConnectionContext context) throws IOException {
+        throw new IOException("Not yet implemented.");
+    }
+    public void commitTransaction(ConnectionContext context) throws IOException {
+        throw new IOException("Not yet implemented.");
+    }
+    public void rollbackTransaction(ConnectionContext context) throws IOException {
+        throw new IOException("Not yet implemented.");
+    }
+    public void checkpoint(boolean sync) throws IOException {
+        throw new IOException("Not yet implemented.");
+    }
+    
+    
+    ///////////////////////////////////////////////////////////////////
+    // Internal helper methods.
+    ///////////////////////////////////////////////////////////////////
+
+    /**
+     * @param location
+     * @return
+     * @throws IOException
+     */
+    private Message loadMessage(Location location) throws IOException {
+        KahaAddMessageCommand addMessage = (KahaAddMessageCommand)load(location);
+        Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addMessage.getMessage().newInput()) );
+        return msg;
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Internal conversion methods.
+    ///////////////////////////////////////////////////////////////////
+    
+    private KahaTransactionInfo createTransactionInfo(TransactionId txid) {
+        if( txid ==null ) {
+            return null;
+        }
+        KahaTransactionInfo rc = new KahaTransactionInfo();
+        
+        // Link it up to the previous record that was part of the transaction.
+        ArrayList<Operation> tx = inflightTransactions.get(txid);
+        if( tx!=null ) {
+            rc.setPreviousEntry(convert(tx.get(tx.size()-1).location));
+        }
+        
+        if( txid.isLocalTransaction() ) {
+            LocalTransactionId t = (LocalTransactionId)txid;
+            KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
+            kahaTxId.setConnectionId(t.getConnectionId().getValue());
+            kahaTxId.setTransacitonId(t.getValue());
+            rc.setLocalTransacitonId(kahaTxId);
+        } else {
+            XATransactionId t = (XATransactionId)txid;
+            KahaXATransactionId kahaTxId = new KahaXATransactionId();
+            kahaTxId.setBranchQualifier(ByteString.copyFrom(t.getBranchQualifier()));
+            kahaTxId.setGlobalTransactionId(ByteString.copyFrom(t.getGlobalTransactionId()));
+            kahaTxId.setFormatId(t.getFormatId());
+            rc.setXaTransacitonId(kahaTxId);
+        }
+        return rc;
+    }
+    
+    private KahaLocation convert(Location location) {
+        KahaLocation rc = new KahaLocation();
+        rc.setLogId(location.getDataFileId());
+        rc.setOffset(location.getOffset());
+        return rc;
+    }
+    
+    private KahaDestination convert(ActiveMQDestination dest) {
+        KahaDestination rc = new KahaDestination();
+        rc.setName(dest.getPhysicalName());
+        switch( dest.getDestinationType() ) {
+        case ActiveMQDestination.QUEUE_TYPE:
+            rc.setType(DestinationType.QUEUE);
+            return rc;
+        case ActiveMQDestination.TOPIC_TYPE:
+            rc.setType(DestinationType.TOPIC);
+            return rc;
+        case ActiveMQDestination.TEMP_QUEUE_TYPE:
+            rc.setType(DestinationType.TEMP_QUEUE);
+            return rc;
+        case ActiveMQDestination.TEMP_TOPIC_TYPE:
+            rc.setType(DestinationType.TEMP_TOPIC);
+            return rc;
+        default:
+            return null;
+        }
+    }
+
+    private ActiveMQDestination convert(String dest) {
+        int p = dest.indexOf(":");
+        if( p<0 ) {
+            throw new IllegalArgumentException("Not in the valid destination format");
+        }
+        int type = Integer.parseInt(dest.substring(0, p));
+        String name = dest.substring(p+1);
+        
+        switch( KahaDestination.DestinationType.valueOf(type) ) {
+        case QUEUE:
+            return new ActiveMQQueue(name);
+        case TOPIC:
+            return new ActiveMQTopic(name);
+        case TEMP_QUEUE:
+            return new ActiveMQTempQueue(name);
+        case TEMP_TOPIC:
+            return new ActiveMQTempTopic(name);
+        default:    
+            throw new IllegalArgumentException("Not in the valid destination format");
+        }
+    }
+    
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=692231&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Thu Sep  4 13:42:26 2008
@@ -0,0 +1,605 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.StringMarshaller;
+import org.apache.kahadb.impl.async.AsyncDataManager;
+import org.apache.kahadb.impl.async.Location;
+import org.apache.kahadb.page.BTreeIndex;
+import org.apache.kahadb.page.Page;
+import org.apache.kahadb.page.PageFile;
+import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.store.data.KahaAddMessageCommand;
+import org.apache.kahadb.store.data.KahaCommitCommand;
+import org.apache.kahadb.store.data.KahaDestination;
+import org.apache.kahadb.store.data.KahaEntryType;
+import org.apache.kahadb.store.data.KahaLocalTransactionId;
+import org.apache.kahadb.store.data.KahaPrepareCommand;
+import org.apache.kahadb.store.data.KahaRemoveDestinationCommand;
+import org.apache.kahadb.store.data.KahaRemoveMessageCommand;
+import org.apache.kahadb.store.data.KahaRollbackCommand;
+import org.apache.kahadb.store.data.KahaTraceCommand;
+import org.apache.kahadb.store.data.KahaTransactionInfo;
+import org.apache.kahadb.store.data.KahaXATransactionId;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+
+public class MessageDatabase {
+
+
+    private static final Log LOG = LogFactory.getLog(MessageDatabase.class);
+    private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
+
+    public static final int CLOSED_STATE = 1;
+    public static final int OPEN_STATE = 2;
+    
+    protected class Metadata {
+        protected Page<Metadata> page;
+        protected int state;
+        protected BTreeIndex<String, StoredDestination> destinations;
+
+        public void read(DataInput is) throws IOException {
+            state = is.readInt();
+            destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
+        }
+
+        public void write(DataOutput os) throws IOException {
+            os.writeInt(state);
+            os.writeLong(destinations.getPageId());
+        }
+    }
+
+    class MetadataMarshaller implements Marshaller<Metadata> {
+        public Class<Metadata> getType() {
+            return Metadata.class;
+        }
+
+        public Metadata readPayload(DataInput dataIn) throws IOException {
+            Metadata rc = new Metadata();
+            rc.read(dataIn);
+            return rc;
+        }
+
+        public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
+            object.write(dataOut);
+        }
+    }
+
+    
+    protected PageFile pageFile;
+    protected  AsyncDataManager asyncDataManager;
+    protected  Metadata metadata = new Metadata();
+
+    protected  MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
+
+    protected  boolean failIfJournalIsLocked;
+
+    protected  boolean deleteAllMessages;
+    protected  File directory;
+    protected boolean recovering;
+    protected  Thread checkpointThread;
+
+    public MessageDatabase() {
+    }
+
+    public void load() throws IOException {
+
+        if (asyncDataManager == null) {
+            asyncDataManager = createAsyncDataManager();
+        }
+        if (failIfJournalIsLocked) {
+            asyncDataManager.lock();
+        } else {
+            while (true) {
+                try {
+                    asyncDataManager.lock();
+                    break;
+                } catch (IOException e) {
+                    LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.");
+                    try {
+                        Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
+                    } catch (InterruptedException e1) {
+                    }
+                }
+            }
+        }
+        if (pageFile == null) {
+            pageFile = createPageFile();
+        }
+
+        asyncDataManager.start();
+        if (deleteAllMessages) {
+            pageFile.delete();
+            asyncDataManager.delete();
+
+            store(new KahaTraceCommand().setMessage("DELETED " + new Date()));
+
+            LOG.info("Journal deleted: ");
+            deleteAllMessages = false;
+        }
+
+        pageFile.load();
+        pageFile.tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                if (tx.getPageCount() == 0) {
+                    // First time this is created.. Initialize the metadata
+                    Page<Metadata> page = tx.allocate();
+                    assert page.getPageId() == 0;
+                    page.set(metadata);
+                    metadata.page = page;
+                    metadata.state = CLOSED_STATE;
+                    metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
+                    tx.store(metadata.page, metadataMarshaller, true);
+                } else {
+                    Page<Metadata> page = tx.load(0, metadataMarshaller);
+                    metadata = page.get();
+                    metadata.page = page;
+                }
+                metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
+                metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
+                metadata.destinations.load();
+            }
+        });
+
+        // Replay the the journal to get the indexes up to date with the latest 
+        // updates.
+        recover();
+        
+        checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+            public void run() {
+                doCheckpoint();
+            }
+        };
+
+    }
+    
+    public void unload() throws IOException {
+        metadata.destinations.unload();
+        metadata.state = CLOSED_STATE;
+        pageFile.tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                tx.store(metadata.page, metadataMarshaller, true);
+            }
+        });
+        metadata = new Metadata();
+    }
+
+
+    /**
+     * Move all the messages that were in the journal into long term storage. We
+     * just replay and do a checkpoint.
+     * 
+     * @throws IOException
+     * @throws IOException
+     * @throws InvalidLocationException
+     * @throws IllegalStateException
+     */
+    private void recover() throws IllegalStateException, IOException {
+        Location pos = null;
+        int redoCounter = 0;
+        LOG.info("Journal Recovery Started from: " + asyncDataManager);
+        long start = System.currentTimeMillis();
+        // While we have records in the journal.
+        while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
+            JournalCommand message = load(pos);
+            process(message, pos);
+            redoCounter++;
+        }
+        Location location = store(new KahaTraceCommand().setMessage("RECOVERED " + new Date()), true);
+        asyncDataManager.setMark(location, true);
+        long end = System.currentTimeMillis();
+        LOG.info("Replayed " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
+    }
+
+    private void doCheckpoint() {
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Methods call by the broker to update and query the store.
+    ///////////////////////////////////////////////////////////////////
+    public Location store(JournalCommand data) throws IOException {
+        return store(data, false);
+    }
+    
+    /**
+     * All updated are are funneled through this method.  The updates a converted to 
+     * a JournalMessage which is logged to the journal and then the data from 
+     * the JournalMessage is used to update the index just like it would be done durring 
+     * a recovery process.
+     */
+    public Location store(JournalCommand data, boolean sync) throws IOException {
+        int size = data.serializedSize();
+        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
+        os.writeByte(data.type().getNumber());
+        data.writeTo(os);
+        Location result = asyncDataManager.write(os.getByteSequence(), sync);
+        process(data, result);
+        return result;
+    }
+
+
+    /**
+     * Loads a previously stored JournalMessage
+     * 
+     * @param location
+     * @return
+     * @throws IOException
+     */
+    public JournalCommand load(Location location) throws IOException {
+        ByteSequence data = asyncDataManager.read(location);
+        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
+        KahaEntryType type = KahaEntryType.valueOf(is.readByte());
+        JournalCommand message = (JournalCommand)type.createMessage();
+        message.mergeFrom(is);
+        return message;
+    }
+    
+    ///////////////////////////////////////////////////////////////////
+    // Journaled record processing methods. Once the record is journaled,
+    // these methods handle applying the index updates. These may be called
+    // from the recovery method too so they need to be idempotent
+    ///////////////////////////////////////////////////////////////////
+    
+    private void process(JournalCommand data, final Location location) throws IOException {
+        data.visit(new Visitor() {
+            @Override
+            public void visit(KahaAddMessageCommand command) throws IOException {
+                process(command, location);
+            }
+
+            @Override
+            public void visit(KahaRemoveMessageCommand command) throws IOException {
+                process(command, location);
+            }
+
+            @Override
+            public void visit(KahaPrepareCommand command) throws IOException {
+                process(command, location);
+            }
+
+            @Override
+            public void visit(KahaCommitCommand command) throws IOException {
+                process(command, location);
+            }
+
+            @Override
+            public void visit(KahaRollbackCommand command) throws IOException {
+                process(command, location);
+            }
+            
+            @Override
+            public void visit(KahaRemoveDestinationCommand command) throws IOException {
+                process(command, location);
+            }
+        });
+    }
+
+    private void process(final KahaAddMessageCommand command, final Location location) throws IOException {
+        if (command.hasTransactionInfo()) {
+            ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
+            inflightTx.add(new AddOpperation(command, location));
+        } else {
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    public void execute(Transaction tx) throws IOException {
+                        upadateIndex(tx, command, location);
+                    }
+                });
+            }
+        }
+    }
+
+    protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
+        if (command.hasTransactionInfo()) {
+            ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
+            inflightTx.add(new RemoveOpperation(command, location));
+        } else {
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    public void execute(Transaction tx) throws IOException {
+                        updateIndex(tx, command, location);
+                    }
+                });
+            }
+        }
+
+    }
+    
+    protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
+        synchronized(indexMutex) {
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    updateIndex(tx, command, location);
+                }
+            });
+        }
+    }
+    
+    protected void process(KahaCommitCommand command, Location location) throws IOException {
+        TransactionId key = key(command.getTransactionInfo());
+        ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
+        if (inflightTx == null) {
+            inflightTx = preparedTransactions.remove(key);
+        }
+        if( inflightTx == null ) {
+            return;
+        }
+        
+        final ArrayList<Operation> messagingTx = inflightTx;
+        synchronized(indexMutex) {
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    for (Operation op : messagingTx) {
+                        op.execute(tx);
+                    }
+                }
+            });
+        }
+    }
+    
+    protected void process(KahaPrepareCommand command, Location location) {
+        TransactionId key = key(command.getTransactionInfo());
+        ArrayList<Operation> tx = inflightTransactions.remove(key);
+        if (tx != null) {
+            preparedTransactions.put(key, tx);
+        }
+    }
+
+    protected void process(KahaRollbackCommand command, Location location) {
+        TransactionId key = key(command.getTransactionInfo());
+        ArrayList<Operation> tx = inflightTransactions.remove(key);
+        if (tx == null) {
+            preparedTransactions.remove(key);
+        }
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // These methods do the actual index updates.
+    ///////////////////////////////////////////////////////////////////
+    
+    protected final Object indexMutex = new Object();
+    
+    private void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
+        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
+        sd.orderIndex.put(tx, location, command.getMessageId());
+        sd.messageIdIndex.put(tx, command.getMessageId(), location);
+    }
+
+    private void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location location) throws IOException {
+        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
+        sd.orderIndex.remove(tx, location);
+        sd.messageIdIndex.remove(tx, command.getMessageId());
+    }
+    
+    private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
+        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
+        sd.orderIndex.clear(tx);
+        sd.messageIdIndex.clear(tx);
+        sd.orderIndex.unload();
+        sd.messageIdIndex.unload();
+        tx.free(sd.orderIndex.getPageId());
+        tx.free(sd.messageIdIndex.getPageId());
+
+        String key = key(command.getDestination());
+        storedDestinations.remove(key);
+        metadata.destinations.remove(tx, key);
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // StoredDestination related implementation methods.
+    ///////////////////////////////////////////////////////////////////
+
+    private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
+
+    static class StoredDestination {
+        BTreeIndex<Location, String> orderIndex;
+        BTreeIndex<String, Location> messageIdIndex;
+    }
+    
+    protected class StoredDestinationMarshaller implements Marshaller<StoredDestination> {
+        public Class<StoredDestination> getType() {
+            return StoredDestination.class;
+        }
+
+        public StoredDestination readPayload(DataInput dataIn) throws IOException {
+            StoredDestination rc = new StoredDestination();
+            rc.orderIndex = new BTreeIndex<Location, String>(pageFile, dataIn.readLong());
+            rc.messageIdIndex = new BTreeIndex<String, Location>(pageFile, dataIn.readLong());
+            return rc;
+        }
+
+        public void writePayload(StoredDestination object, DataOutput dataOut) throws IOException {
+            dataOut.writeLong(object.orderIndex.getPageId());
+            dataOut.writeLong(object.messageIdIndex.getPageId());
+        }
+    }
+    
+    static class LocationMarshaller implements Marshaller<Location> {
+        final static LocationMarshaller INSTANCE = new  LocationMarshaller();
+        
+        public Class<Location> getType() {
+            return Location.class;
+        }
+
+        public Location readPayload(DataInput dataIn) throws IOException {
+            Location rc = new Location();
+            rc.setDataFileId(dataIn.readInt());
+            rc.setOffset(dataIn.readInt());
+            return rc;
+        }
+
+        public void writePayload(Location object, DataOutput dataOut) throws IOException {
+            dataOut.writeInt(object.getDataFileId());
+            dataOut.writeInt(object.getOffset());
+        }
+    }
+
+    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
+        String key = key(destination);
+        StoredDestination rc = storedDestinations.get(key);
+        if (rc == null) {
+            // Try to load the existing indexes..
+            rc = metadata.destinations.get(tx, key);
+            if( rc ==null ) {
+                // Brand new destination.. allocate indexes for it.
+                rc = new StoredDestination();
+                rc.orderIndex = new BTreeIndex<Location, String>(pageFile, tx.allocate());
+                rc.messageIdIndex = new BTreeIndex<String, Location>(pageFile, tx.allocate());
+            }
+            
+            // Configure the marshalers and load.
+            rc.orderIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
+            rc.orderIndex.setValueMarshaller(StringMarshaller.INSTANCE);
+            rc.orderIndex.load();
+            rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
+            rc.messageIdIndex.setValueMarshaller(LocationMarshaller.INSTANCE);
+            rc.messageIdIndex.load();
+            
+            // Cache it.  We may want to remove/unload destinations from the cache that are not used for a while
+            // to reduce memory usage.
+            storedDestinations.put(key, rc);
+        }
+        return rc;
+    }
+    
+    private String key(KahaDestination destination) {
+        return destination.getType().getNumber()+":"+destination.getName();
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Transaction related implementation methods.
+    ///////////////////////////////////////////////////////////////////
+    protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
+    protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
+       
+
+    private ArrayList<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
+        TransactionId key = key(info);
+        ArrayList<Operation> tx = inflightTransactions.get(key);
+        if (tx == null) {
+            tx = new ArrayList<Operation>();
+            inflightTransactions.put(key, tx);
+        }
+        return tx;
+    }
+
+    private TransactionId key(KahaTransactionInfo transactionInfo) {
+        if (transactionInfo.hasLocalTransacitonId()) {
+            KahaLocalTransactionId tx = transactionInfo.getLocalTransacitonId();
+            LocalTransactionId rc = new LocalTransactionId();
+            rc.setConnectionId(new ConnectionId(tx.getConnectionId()));
+            rc.setValue(tx.getTransacitonId());
+            return rc;
+        } else {
+            KahaXATransactionId tx = transactionInfo.getXaTransacitonId();
+            XATransactionId rc = new XATransactionId();
+            rc.setBranchQualifier(tx.getBranchQualifier().toByteArray());
+            rc.setGlobalTransactionId(tx.getGlobalTransactionId().toByteArray());
+            rc.setFormatId(tx.getFormatId());
+            return rc;
+        }
+    }
+
+    class Operation {
+        final Location location;
+        public Operation(Location location) {
+            this.location = location;
+        }
+        public void execute(Transaction tx) throws IOException {
+        }
+    }
+    
+    private class AddOpperation extends Operation {
+        final KahaAddMessageCommand command;
+        
+        public AddOpperation(KahaAddMessageCommand command, Location location) {
+            super(location);
+            this.command = command;
+        }
+
+        public void execute(Transaction tx) throws IOException {
+            upadateIndex(tx, command, location);
+        }
+    }
+    
+    private class RemoveOpperation extends Operation {
+        final KahaRemoveMessageCommand command;
+        
+        public RemoveOpperation(KahaRemoveMessageCommand command, Location location) {
+            super(location);
+            this.command = command;
+        }
+
+        public void execute(Transaction tx) throws IOException {
+            updateIndex(tx, command, location);
+        }
+    }
+    
+    
+    ///////////////////////////////////////////////////////////////////
+    // Initialization related implementation methods.
+    ///////////////////////////////////////////////////////////////////
+
+    
+    private PageFile createPageFile() {
+        PageFile pf = new PageFile(directory, "database");
+        return pf;
+    }
+
+    private AsyncDataManager createAsyncDataManager() {
+        AsyncDataManager manager = new AsyncDataManager();
+        manager.setDirectory(new File(directory, "journal"));
+        manager.setMaxFileLength(1024 * 1024 * 20);
+        manager.setUseNio(false);
+        return manager;
+    }
+
+    public File getDirectory() {
+        return directory;
+    }
+
+    public void setDirectory(File directory) {
+        this.directory = directory;
+    }
+
+    public boolean isDeleteAllMessages() {
+        return deleteAllMessages;
+    }
+
+    public void setDeleteAllMessages(boolean deleteAllMessages) {
+        this.deleteAllMessages = deleteAllMessages;
+    }
+
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/Visitor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/Visitor.java?rev=692231&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/Visitor.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/Visitor.java Thu Sep  4 13:42:26 2008
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store;
+
+import java.io.IOException;
+
+import org.apache.kahadb.store.data.KahaAddMessageCommand;
+import org.apache.kahadb.store.data.KahaCommitCommand;
+import org.apache.kahadb.store.data.KahaPrepareCommand;
+import org.apache.kahadb.store.data.KahaRemoveDestinationCommand;
+import org.apache.kahadb.store.data.KahaRemoveMessageCommand;
+import org.apache.kahadb.store.data.KahaRollbackCommand;
+import org.apache.kahadb.store.data.KahaTraceCommand;
+
+public class Visitor {
+
+    public void visit(KahaTraceCommand command) {
+    }
+
+    public void visit(KahaRollbackCommand command) throws IOException {
+    }
+
+    public void visit(KahaRemoveMessageCommand command) throws IOException {
+    }
+
+    public void visit(KahaPrepareCommand command) throws IOException {
+    }
+
+    public void visit(KahaCommitCommand command) throws IOException {
+    }
+
+    public void visit(KahaAddMessageCommand command) throws IOException {
+    }
+
+    public void visit(KahaRemoveDestinationCommand command) throws IOException {
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/proto/journal-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/proto/journal-data.proto?rev=692231&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/proto/journal-data.proto (added)
+++ activemq/sandbox/kahadb/src/main/proto/journal-data.proto Thu Sep  4 13:42:26 2008
@@ -0,0 +1,134 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+package org.apache.kahadb.store.data;
+
+option java_multiple_files = true;
+option java_outer_classname = "JournalData";
+
+enum KahaEntryType {
+  //| option java_create_message="true";
+  KAHA_TRACE_COMMAND = 0;
+  KAHA_ADD_MESSAGE_COMMAND = 1;
+  KAHA_REMOVE_MESSAGE_COMMAND = 2;
+  KAHA_PREPARE_COMMAND = 3;
+  KAHA_COMMIT_COMMAND = 4;
+  KAHA_ROLLBACK_COMMAND = 5;
+  KAHA_REMOVE_DESTINATION_COMMAND = 6;
+}
+
+message KahaTraceCommand {
+  // We make use of the wonky comment style bellow because the following options
+  // are not valid for protoc, but they are valid for the ActiveMQ proto compiler.
+  // In the ActiveMQ proto compiler, comments terminate with the pipe character: |
+
+  //| option java_implments = "org.apache.kahadb.store.JournalCommand<KahaTraceCommand>";
+  //| option java_visitor = "org.apache.kahadb.store.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+  
+  required string message = 1;
+}
+
+message KahaAddMessageCommand {
+  //| option java_implments = "org.apache.kahadb.store.JournalCommand<KahaAddMessageCommand>";
+  //| option java_visitor = "org.apache.kahadb.store.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+  
+  optional KahaTransactionInfo transaction_info=1;
+  required KahaDestination destination = 2;
+  required string messageId = 3;
+  required bytes message = 4;
+}
+
+message KahaRemoveMessageCommand {
+  //| option java_implments = "org.apache.kahadb.store.JournalCommand<KahaRemoveMessageCommand>";
+  //| option java_visitor = "org.apache.kahadb.store.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  optional KahaTransactionInfo transaction_info=1;
+  required KahaDestination destination = 2;
+  required string messageId = 3;
+}
+
+message KahaPrepareCommand {
+  //| option java_implments = "org.apache.kahadb.store.JournalCommand<KahaPrepareCommand>";
+  //| option java_visitor = "org.apache.kahadb.store.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  required KahaTransactionInfo transaction_info=1;
+}
+
+message KahaCommitCommand {
+  //| option java_implments = "org.apache.kahadb.store.JournalCommand<KahaCommitCommand>";
+  //| option java_visitor = "org.apache.kahadb.store.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  required KahaTransactionInfo transaction_info=1;
+}
+
+message KahaRollbackCommand {
+  //| option java_implments = "org.apache.kahadb.store.JournalCommand<KahaRollbackCommand>";
+  //| option java_visitor = "org.apache.kahadb.store.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  required KahaTransactionInfo transaction_info=1;
+}
+
+message KahaRemoveDestinationCommand {
+  //| option java_implments = "org.apache.kahadb.store.JournalCommand<KahaRemoveDestinationCommand>";
+  //| option java_visitor = "org.apache.kahadb.store.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  required KahaDestination destination = 1;
+}
+message KahaDestination {
+  enum DestinationType {
+    QUEUE = 0;
+    TOPIC = 1;
+    TEMP_QUEUE = 2;
+    TEMP_TOPIC = 3;
+  }
+
+  required DestinationType type = 1 [default = QUEUE];
+  required string name = 2;
+}
+
+message KahaTransactionInfo {
+  optional KahaLocalTransactionId local_transaciton_id=1;
+  optional KahaXATransactionId xa_transaciton_id=2;
+  optional KahaLocation previous_entry=3;
+}
+
+message KahaLocalTransactionId {
+  required string connection_id=1;
+  required int64 transaciton_id=1;
+}
+
+message KahaXATransactionId {
+  required int32 format_id = 1;
+  required bytes branch_qualifier = 2;
+  required bytes global_transaction_id = 3;
+}
+
+message KahaLocation {
+  required int32 log_id = 1;  
+  required int32 offset = 2;  
+}
+
+// TODO things to ponder
+// should we move more message fields
+// that are set by the sender (and rarely required by the broker
+// into the Properties object?

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaLoadTester.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaLoadTester.java?rev=692231&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaLoadTester.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaLoadTester.java Thu Sep  4 13:42:26 2008
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTestSupport;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ProgressPrinter;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.memory.list.SimpleMessageList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision$
+ */
+public class KahaLoadTester extends JmsTestSupport {
+
+    private static final Log LOG = LogFactory.getLog(KahaLoadTester.class);
+    
+    protected int messageSize = 1024 * 64;
+    protected int produceCount = 10000;
+
+    protected BrokerService createBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/broker/store/loadtester.xml"));
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(((TransportConnector)broker.getTransportConnectors().get(0)).getServer().getConnectURI());
+        factory.setUseAsyncSend(true);
+        return factory;
+    }
+
+    public void testQueueSendThenAddConsumer() throws Exception {
+        ProgressPrinter printer = new ProgressPrinter(produceCount, 20);
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        connection.setUseCompression(false);
+        connection.getPrefetchPolicy().setAll(10);
+        connection.start();
+        Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        LOG.info("Sending " + produceCount + " messages that are " + (messageSize / 1024.0) + "k large, for a total of " + (produceCount * messageSize / (1024.0 * 1024.0))
+                 + " megs of data.");
+        // Send a message to the broker.
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < produceCount; i++) {
+            printer.increment();
+            BytesMessage msg = session.createBytesMessage();
+            msg.writeBytes(new byte[messageSize]);
+            producer.send(msg);
+        }
+        long end1 = System.currentTimeMillis();
+
+        LOG.info("Produced messages/sec: " + (produceCount * 1000.0 / (end1 - start)));
+
+        printer = new ProgressPrinter(produceCount, 10);
+        start = System.currentTimeMillis();
+        MessageConsumer consumer = session.createConsumer(destination);
+        for (int i = 0; i < produceCount; i++) {
+            printer.increment();
+            assertNotNull("Getting message: " + i, consumer.receive(20000));
+        }
+        end1 = System.currentTimeMillis();
+        LOG.info("Consumed messages/sec: " + (produceCount * 1000.0 / (end1 - start)));
+
+    }
+
+    public static Test suite() {
+        return suite(KahaLoadTester.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaLoadTester.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreBrokerTest.java?rev=692231&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreBrokerTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreBrokerTest.java Thu Sep  4 13:42:26 2008
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store;
+
+import java.net.URI;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerTest;
+
+/**
+ * Once the wire format is completed we can test against real persistence storage.
+ * 
+ * @version $Revision$
+ */
+public class KahaStoreBrokerTest extends BrokerTest {
+
+    protected BrokerService createBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("broker://()/localhost?deleteAllMessagesOnStartup=true"));
+    }
+    
+    protected BrokerService createRestartedBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("broker://()/localhost"));
+    }
+    
+    public static Test suite() {
+        return suite(KahaStoreBrokerTest.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreBrokerTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreRecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreRecoveryBrokerTest.java?rev=692231&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreRecoveryBrokerTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreRecoveryBrokerTest.java Thu Sep  4 13:42:26 2008
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store;
+
+import java.io.File;
+import java.net.URI;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.RecoveryBrokerTest;
+
+
+/**
+ * Used to verify that recovery works correctly against 
+ * 
+ * @version $Revision$
+ */
+public class KahaStoreRecoveryBrokerTest extends RecoveryBrokerTest {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        KahaDBPersistenceAdaptor kaha = new KahaDBPersistenceAdaptor();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        kaha.deleteAllMessages();
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+    
+    protected BrokerService createRestartedBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        KahaDBPersistenceAdaptor kaha = new KahaDBPersistenceAdaptor();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+    
+    public static Test suite() {
+        return suite(KahaStoreRecoveryBrokerTest.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreRecoveryBrokerTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreXARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreXARecoveryBrokerTest.java?rev=692231&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreXARecoveryBrokerTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreXARecoveryBrokerTest.java Thu Sep  4 13:42:26 2008
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store;
+
+import java.net.URI;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.XARecoveryBrokerTest;
+
+/**
+ * Used to verify that recovery works correctly against 
+ * 
+ * @version $Revision$
+ */
+public class KahaStoreXARecoveryBrokerTest extends XARecoveryBrokerTest {
+
+    public static Test suite() {
+        return suite(KahaStoreXARecoveryBrokerTest.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("broker://()/localhost?deleteAllMessagesOnStartup=true"));
+    }
+    
+    protected BrokerService createRestartedBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("broker://()/localhost"));
+    }
+    
+}

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreXARecoveryBrokerTest.java
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message