activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1170201 [2/3] - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/filter/ activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq-core/src/test/java/org/apache/activemq/broker/ activemq-core/src/test/java/o...
Date Tue, 13 Sep 2011 15:01:38 GMT
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1170201&r1=1170200&r2=1170201&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Tue Sep 13 15:01:37 2011
@@ -18,7 +18,15 @@ package org.apache.activemq.store.kahadb
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,19 +36,29 @@ import java.util.concurrent.locks.Reentr
 import org.apache.activemq.ActiveMQMessageAuditNoSync;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.kahadb.data.*;
+import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
+import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
+import org.apache.activemq.store.kahadb.data.KahaDestination;
+import org.apache.activemq.store.kahadb.data.KahaEntryType;
+import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
+import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
+import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
+import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
+import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
+import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
+import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
+import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
 import org.apache.activemq.util.Callback;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
+import org.apache.kahadb.util.LocationMarshaller;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.kahadb.index.BTreeIndex;
 import org.apache.kahadb.index.BTreeVisitor;
 import org.apache.kahadb.journal.DataFile;
@@ -49,9 +67,16 @@ import org.apache.kahadb.journal.Locatio
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.page.Transaction;
-import org.apache.kahadb.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.LockFile;
+import org.apache.kahadb.util.LongMarshaller;
+import org.apache.kahadb.util.Marshaller;
+import org.apache.kahadb.util.Sequence;
+import org.apache.kahadb.util.SequenceSet;
+import org.apache.kahadb.util.StringMarshaller;
+import org.apache.kahadb.util.VariableMarshaller;
 
 public class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
 
@@ -61,11 +86,9 @@ public class MessageDatabase extends Ser
     public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "0"));
 
     protected static final Buffer UNMATCHED;
-
     static {
         UNMATCHED = new Buffer(new byte[]{});
     }
-
     private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
     private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
 
@@ -85,7 +108,6 @@ public class MessageDatabase extends Ser
         protected Location producerSequenceIdTrackerLocation = null;
         protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
         protected int version = VERSION;
-
         public void read(DataInput is) throws IOException {
             state = is.readInt();
             destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
@@ -108,9 +130,9 @@ public class MessageDatabase extends Ser
             } catch (EOFException expectedOnUpgrade) {
             }
             try {
-                version = is.readInt();
-            } catch (EOFException expectedOnUpgrade) {
-                version = 1;
+               version = is.readInt();
+            }catch (EOFException expectedOnUpgrade) {
+                version=1;
             }
             LOG.info("KahaDB is version " + version);
         }
@@ -156,7 +178,7 @@ public class MessageDatabase extends Ser
     }
 
     protected PageFile pageFile;
-    protected JournalManager journalManager;
+    protected Journal journal;
     protected Metadata metadata = new Metadata();
 
     protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
@@ -166,12 +188,12 @@ public class MessageDatabase extends Ser
     protected boolean deleteAllMessages;
     protected File directory = new File("KahaDB");
     protected Thread checkpointThread;
-    protected boolean enableJournalDiskSyncs = true;
+    protected boolean enableJournalDiskSyncs=true;
     protected boolean archiveDataLogs;
     protected File directoryArchive;
     protected AtomicLong storeSize = new AtomicLong(0);
-    long checkpointInterval = 5 * 1000;
-    long cleanupInterval = 30 * 1000;
+    long checkpointInterval = 5*1000;
+    long cleanupInterval = 30*1000;
     int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
     int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
     boolean enableIndexWriteAsync = false;
@@ -187,8 +209,6 @@ public class MessageDatabase extends Ser
     private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY;
     protected boolean forceRecoverIndex = false;
     private final Object checkpointThreadLock = new Object();
-    private boolean journalPerDestination = false;
-
 
     public MessageDatabase() {
     }
@@ -235,15 +255,15 @@ public class MessageDatabase extends Ser
             storedDestinations.clear();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
-                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext(); ) {
+                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
                         Entry<String, StoredDestination> entry = iterator.next();
-                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions != null);
+                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
                         storedDestinations.put(entry.getKey(), sd);
                     }
                 }
             });
             pageFile.flush();
-        } finally {
+        }finally {
             this.indexLock.writeLock().unlock();
         }
     }
@@ -270,11 +290,11 @@ public class MessageDatabase extends Ser
                             while (opened.get()) {
                                 Thread.sleep(sleepTime);
                                 long now = System.currentTimeMillis();
-                                if (now - lastCleanup >= cleanupInterval) {
+                                if( now - lastCleanup >= cleanupInterval ) {
                                     checkpointCleanup(true);
                                     lastCleanup = now;
                                     lastCheckpoint = now;
-                                } else if (now - lastCheckpoint >= checkpointInterval) {
+                                } else if( now - lastCheckpoint >= checkpointInterval ) {
                                     checkpointCleanup(false);
                                     lastCheckpoint = now;
                                 }
@@ -295,8 +315,8 @@ public class MessageDatabase extends Ser
     }
 
     public void open() throws IOException {
-        if (opened.compareAndSet(false, true)) {
-            getJournalManager().start();
+        if( opened.compareAndSet(false, true) ) {
+            getJournal().start();
             loadPageFile();
             startCheckpoint();
             recover();
@@ -348,20 +368,18 @@ public class MessageDatabase extends Ser
         try {
             lock();
             if (deleteAllMessages) {
-                getJournalManager().start();
-                getJournalManager().delete();
-                getJournalManager().close();
-                journalManager = null;
+                getJournal().start();
+                getJournal().delete();
+                getJournal().close();
+                journal = null;
                 getPageFile().delete();
                 LOG.info("Persistence store purged.");
                 deleteAllMessages = false;
             }
 
             open();
-            for (Journal journal : getJournalManager().getJournals()) {
-                store(journal, new KahaTraceCommand().setMessage("LOADED " + new Date()));
-            }
-        } finally {
+            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
+        }finally {
             this.indexLock.writeLock().unlock();
         }
 
@@ -369,34 +387,32 @@ public class MessageDatabase extends Ser
 
 
     public void close() throws IOException, InterruptedException {
-        if (opened.compareAndSet(true, false)) {
+        if( opened.compareAndSet(true, false)) {
             this.indexLock.writeLock().lock();
             try {
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
-                        for (Journal journal : getJournalManager().getJournals()) {
-                            checkpointUpdate(tx, journal, true);
-                        }
+                        checkpointUpdate(tx, true);
                     }
                 });
                 pageFile.unload();
                 metadata = new Metadata();
-            } finally {
+            }finally {
                 this.indexLock.writeLock().unlock();
             }
-            journalManager.close();
+            journal.close();
             synchronized (checkpointThreadLock) {
                 checkpointThread.join();
             }
             lockFile.unlock();
-            lockFile = null;
+            lockFile=null;
         }
     }
 
     public void unload() throws IOException, InterruptedException {
         this.indexLock.writeLock().lock();
         try {
-            if (pageFile != null && pageFile.isLoaded()) {
+            if( pageFile != null && pageFile.isLoaded() ) {
                 metadata.state = CLOSED_STATE;
                 metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
 
@@ -406,7 +422,7 @@ public class MessageDatabase extends Ser
                     }
                 });
             }
-        } finally {
+        }finally {
             this.indexLock.writeLock().unlock();
         }
         close();
@@ -417,12 +433,22 @@ public class MessageDatabase extends Ser
         Location l = null;
         synchronized (inflightTransactions) {
             if (!inflightTransactions.isEmpty()) {
-                l = inflightTransactions.values().iterator().next().get(0).getLocation();
+                for (List<Operation> ops : inflightTransactions.values()) {
+                    if (!ops.isEmpty()) {
+                        l = ops.get(0).getLocation();
+                        break;
+                    }
+                }
             }
             if (!preparedTransactions.isEmpty()) {
-                Location t = preparedTransactions.values().iterator().next().get(0).getLocation();
-                if (l == null || t.compareTo(l) <= 0) {
-                    l = t;
+                for (List<Operation> ops : preparedTransactions.values()) {
+                    if (!ops.isEmpty()) {
+                        Location t = ops.get(0).getLocation();
+                        if (l==null || t.compareTo(l) <= 0) {
+                            l = t;
+                        }
+                        break;
+                    }
                 }
             }
         }
@@ -432,65 +458,67 @@ public class MessageDatabase extends Ser
     /**
      * Move all the messages that were in the journal into long term storage. We
      * just replay and do a checkpoint.
+     *
+     * @throws IOException
+     * @throws IOException
+     * @throws IllegalStateException
      */
     private void recover() throws IllegalStateException, IOException {
         this.indexLock.writeLock().lock();
         try {
-            for (Journal journal : getJournalManager().getJournals()) {
-                recover(journal);
-            }
-        } finally {
-            this.indexLock.writeLock().unlock();
-        }
-    }
-
-    private void recover(final Journal journal) throws IllegalStateException, IOException {
 
-        long start = System.currentTimeMillis();
-        Location producerAuditPosition = recoverProducerAudit(journal);
-        Location lastIndoubtPosition = getRecoveryPosition(journal);
+            long start = System.currentTimeMillis();
+            Location producerAuditPosition = recoverProducerAudit();
+            Location lastIndoubtPosition = getRecoveryPosition();
 
-        Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
+            Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
 
-        if (recoveryPosition != null) {
-            int redoCounter = 0;
-            LOG.info("Recovering from the journal ...");
-            while (recoveryPosition != null) {
-                JournalCommand<?> message = load(journal, recoveryPosition);
-                metadata.lastUpdate = recoveryPosition;
-                process(message, recoveryPosition, lastIndoubtPosition);
-                redoCounter++;
-                recoveryPosition = journal.getNextLocation(recoveryPosition);
+            if (recoveryPosition != null) {
+                int redoCounter = 0;
+                LOG.info("Recovering from the journal ...");
+                while (recoveryPosition != null) {
+                    JournalCommand<?> message = load(recoveryPosition);
+                    metadata.lastUpdate = recoveryPosition;
+                    process(message, recoveryPosition, lastIndoubtPosition);
+                    redoCounter++;
+                    recoveryPosition = journal.getNextLocation(recoveryPosition);
+                }
+                long end = System.currentTimeMillis();
+                LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
             }
-            long end = System.currentTimeMillis();
-            LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
-        }
 
-        // We may have to undo some index updates.
-        pageFile.tx().execute(new Transaction.Closure<IOException>() {
-            public void execute(Transaction tx) throws IOException {
-                recoverIndex(tx, journal);
-            }
-        });
+            // We may have to undo some index updates.
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    recoverIndex(tx);
+                }
+            });
 
-        // rollback any recovered inflight local transactions
-        Set<TransactionId> toRollback = new HashSet<TransactionId>();
-        synchronized (inflightTransactions) {
-            for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
-                TransactionId id = it.next();
-                if (id.isLocalTransaction()) {
-                    toRollback.add(id);
+            // rollback any recovered inflight local transactions
+            Set<TransactionId> toRollback = new HashSet<TransactionId>();
+            synchronized (inflightTransactions) {
+                for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
+                    TransactionId id = it.next();
+                    if (id.isLocalTransaction()) {
+                        toRollback.add(id);
+                    }
+                }
+                for (TransactionId tx: toRollback) {
+                    LOG.debug("rolling back recovered indoubt local transaction " + tx);
+                    store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
                 }
             }
-            for (TransactionId tx : toRollback) {
-                LOG.debug("rolling back recovered indoubt local transaction " + tx);
-                store(journal, new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(tx)), false, null, null);
-            }
+        }finally {
+            this.indexLock.writeLock().unlock();
         }
     }
 
+    private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
+        return TransactionIdConversion.convertToLocal(tx);
+    }
+
     private Location minimum(Location producerAuditPosition,
-                             Location lastIndoubtPosition) {
+            Location lastIndoubtPosition) {
         Location min = null;
         if (producerAuditPosition != null) {
             min = producerAuditPosition;
@@ -503,9 +531,9 @@ public class MessageDatabase extends Ser
         return min;
     }
 
-    private Location recoverProducerAudit(Journal journal) throws IOException {
+    private Location recoverProducerAudit() throws IOException {
         if (metadata.producerSequenceIdTrackerLocation != null) {
-            KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(journal, metadata.producerSequenceIdTrackerLocation);
+            KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
             try {
                 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
                 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
@@ -520,12 +548,12 @@ public class MessageDatabase extends Ser
         }
     }
 
-    protected void recoverIndex(Transaction tx, Journal journal) throws IOException {
+    protected void recoverIndex(Transaction tx) throws IOException {
         long start = System.currentTimeMillis();
         // It is possible index updates got applied before the journal updates..
         // in that case we need to removed references to messages that are not in the journal
         final Location lastAppendLocation = journal.getLastAppendLocation();
-        long undoCounter = 0;
+        long undoCounter=0;
 
         // Go through all the destinations to see if they have messages past the lastAppendLocation
         for (StoredDestination sd : storedDestinations.values()) {
@@ -551,7 +579,7 @@ public class MessageDatabase extends Ser
         }
 
         long end = System.currentTimeMillis();
-        if (undoCounter > 0) {
+        if( undoCounter > 0 ) {
             // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
             // should do sync writes to the journal.
             LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
@@ -567,12 +595,12 @@ public class MessageDatabase extends Ser
         for (StoredDestination sd : storedDestinations.values()) {
             // Use a visitor to cut down the number of pages that we load
             sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
-                int last = -1;
+                int last=-1;
 
                 public boolean isInterestedInKeysBetween(Location first, Location second) {
-                    if (first == null) {
+                    if( first==null ) {
                         return !ss.contains(0, second.getDataFileId());
-                    } else if (second == null) {
+                    } else if( second==null ) {
                         return true;
                     } else {
                         return !ss.contains(first.getDataFileId(), second.getDataFileId());
@@ -582,7 +610,7 @@ public class MessageDatabase extends Ser
                 public void visit(List<Location> keys, List<Long> values) {
                     for (Location l : keys) {
                         int fileId = l.getDataFileId();
-                        if (last != fileId) {
+                        if( last != fileId ) {
                             ss.add(fileId);
                             last = fileId;
                         }
@@ -592,34 +620,34 @@ public class MessageDatabase extends Ser
             });
         }
         HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
-        while (!ss.isEmpty()) {
-            missingJournalFiles.add((int) ss.removeFirst());
+        while( !ss.isEmpty() ) {
+            missingJournalFiles.add( (int)ss.removeFirst() );
         }
-        missingJournalFiles.removeAll(journal.getFileMap().keySet());
+        missingJournalFiles.removeAll( journal.getFileMap().keySet() );
 
-        if (!missingJournalFiles.isEmpty()) {
-            LOG.info("Some journal files are missing: " + missingJournalFiles);
+        if( !missingJournalFiles.isEmpty() ) {
+            LOG.info("Some journal files are missing: "+missingJournalFiles);
         }
 
         ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
         for (Integer missing : missingJournalFiles) {
-            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
+            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing,0), new Location(missing+1,0)));
         }
 
-        if (checkForCorruptJournalFiles) {
+        if ( checkForCorruptJournalFiles ) {
             Collection<DataFile> dataFiles = journal.getFileMap().values();
             for (DataFile dataFile : dataFiles) {
                 int id = dataFile.getDataFileId();
-                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
+                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id,dataFile.getLength()), new Location(id+1,0)));
                 Sequence seq = dataFile.getCorruptedBlocks().getHead();
-                while (seq != null) {
-                    missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)));
+                while( seq!=null ) {
+                    missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast()+1)));
                     seq = seq.getNext();
                 }
             }
         }
 
-        if (!missingPredicates.isEmpty()) {
+        if( !missingPredicates.isEmpty() ) {
             for (StoredDestination sd : storedDestinations.values()) {
 
                 final ArrayList<Long> matches = new ArrayList<Long>();
@@ -631,11 +659,11 @@ public class MessageDatabase extends Ser
                 });
 
                 // If somes message references are affected by the missing data files...
-                if (!matches.isEmpty()) {
+                if( !matches.isEmpty() ) {
 
                     // We either 'gracefully' recover dropping the missing messages or
                     // we error out.
-                    if (ignoreMissingJournalfiles) {
+                    if( ignoreMissingJournalfiles ) {
                         // Update the index to remove the references to the missing data
                         for (Long sequenceId : matches) {
                             MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
@@ -646,14 +674,14 @@ public class MessageDatabase extends Ser
                         }
 
                     } else {
-                        throw new IOException("Detected missing/corrupt journal files. " + matches.size() + " messages affected.");
+                        throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
                     }
                 }
             }
         }
 
         end = System.currentTimeMillis();
-        if (undoCounter > 0) {
+        if( undoCounter > 0 ) {
             // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
             // should do sync writes to the journal.
             LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
@@ -663,12 +691,12 @@ public class MessageDatabase extends Ser
     private Location nextRecoveryPosition;
     private Location lastRecoveryPosition;
 
-    public void incrementalRecover(Journal journal) throws IOException {
+    public void incrementalRecover() throws IOException {
         this.indexLock.writeLock().lock();
         try {
-            if (nextRecoveryPosition == null) {
-                if (lastRecoveryPosition == null) {
-                    nextRecoveryPosition = getRecoveryPosition(journal);
+            if( nextRecoveryPosition == null ) {
+                if( lastRecoveryPosition==null ) {
+                    nextRecoveryPosition = getRecoveryPosition();
                 } else {
                     nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
                 }
@@ -676,11 +704,11 @@ public class MessageDatabase extends Ser
             while (nextRecoveryPosition != null) {
                 lastRecoveryPosition = nextRecoveryPosition;
                 metadata.lastUpdate = lastRecoveryPosition;
-                JournalCommand<?> message = load(journal, lastRecoveryPosition);
+                JournalCommand<?> message = load(lastRecoveryPosition);
                 process(message, lastRecoveryPosition, (Runnable)null);
                 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
             }
-        } finally {
+        }finally {
             this.indexLock.writeLock().unlock();
         }
     }
@@ -689,7 +717,7 @@ public class MessageDatabase extends Ser
         return metadata.lastUpdate;
     }
 
-    private Location getRecoveryPosition(Journal journal) throws IOException {
+    private Location getRecoveryPosition() throws IOException {
 
         if (!this.forceRecoverIndex) {
 
@@ -699,7 +727,7 @@ public class MessageDatabase extends Ser
             }
 
             // Perhaps there were no transactions...
-            if (metadata.lastUpdate != null) {
+            if( metadata.lastUpdate!=null) {
                 // Start replay at the record after the last one recorded in the index file.
                 return journal.getNextLocation(metadata.lastUpdate);
             }
@@ -709,44 +737,38 @@ public class MessageDatabase extends Ser
     }
 
     protected void checkpointCleanup(final boolean cleanup) throws IOException {
-        for (Journal journal : getJournalManager().getJournals()) {
-            checkpointCleanup(journal, cleanup);
-        }
-    }
-
-    protected void checkpointCleanup(final Journal journal, final boolean cleanup) throws IOException {
         long start;
         this.indexLock.writeLock().lock();
         try {
             start = System.currentTimeMillis();
-            if (!opened.get()) {
+            if( !opened.get() ) {
                 return;
             }
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
-                    checkpointUpdate(tx, journal, cleanup);
+                    checkpointUpdate(tx, cleanup);
                 }
             });
-        } finally {
+        }finally {
             this.indexLock.writeLock().unlock();
         }
         long end = System.currentTimeMillis();
-        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
-            LOG.info("Slow KahaDB access: cleanup took " + (end - start));
+        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+            LOG.info("Slow KahaDB access: cleanup took "+(end-start));
         }
     }
 
 
-    public void checkpoint(final Journal journal, Callback closure) throws Exception {
+    public void checkpoint(Callback closure) throws Exception {
         this.indexLock.writeLock().lock();
         try {
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
-                    checkpointUpdate(tx, journal, false);
+                    checkpointUpdate(tx, false);
                 }
             });
             closure.execute();
-        } finally {
+        }finally {
             this.indexLock.writeLock().unlock();
         }
     }
@@ -754,18 +776,17 @@ public class MessageDatabase extends Ser
     // /////////////////////////////////////////////////////////////////
     // Methods call by the broker to update and query the store.
     // /////////////////////////////////////////////////////////////////
-    public Location store(Journal journal, JournalCommand<?> data) throws IOException {
-        return store(journal, data, false, null, null);
+    public Location store(JournalCommand<?> data) throws IOException {
+        return store(data, false, null,null);
     }
 
-
     /**
      * All updated are are funneled through this method. The updates are converted
      * to a JournalMessage which is logged to the journal and then the data from
      * the JournalMessage is used to update the index just like it would be done
      * during a recovery process.
      */
-    public Location store(final Journal journal, JournalCommand<?> data, boolean sync, Runnable before, Runnable after) throws IOException {
+    public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
         if (before != null) {
             before.run();
         }
@@ -780,8 +801,8 @@ public class MessageDatabase extends Ser
             long start2 = System.currentTimeMillis();
             process(data, location, after);
             long end = System.currentTimeMillis();
-            if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
-                LOG.info("Slow KahaDB access: Journal append took: " + (start2 - start) + " ms, Index update took " + (end - start2) + " ms");
+            if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+                LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
             }
 
             if (after != null) {
@@ -812,27 +833,35 @@ public class MessageDatabase extends Ser
 
     /**
      * Loads a previously stored JournalMessage
+     *
+     * @param location
+     * @return
+     * @throws IOException
      */
-    public JournalCommand<?> load(Journal journal, Location location) throws IOException {
+    public JournalCommand<?> load(Location location) throws IOException {
         long start = System.currentTimeMillis();
         ByteSequence data = journal.read(location);
         long end = System.currentTimeMillis();
-        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
-            LOG.info("Slow KahaDB access: Journal read took: " + (end - start) + " ms");
+        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+            LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
         }
         DataByteArrayInputStream is = new DataByteArrayInputStream(data);
         byte readByte = is.readByte();
         KahaEntryType type = KahaEntryType.valueOf(readByte);
-        if (type == null) {
-            throw new IOException("Could not load journal record. Invalid location: " + location);
+        if( type == null ) {
+            throw new IOException("Could not load journal record. Invalid location: "+location);
         }
-        JournalCommand<?> message = (JournalCommand<?>) type.createMessage();
+        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
         message.mergeFramed(is);
         return message;
     }
 
     /**
      * do minimal recovery till we reach the last inDoubtLocation
+     * @param data
+     * @param location
+     * @param inDoubtlocation
+     * @throws IOException
      */
     void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
         if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
@@ -914,7 +943,7 @@ public class MessageDatabase extends Ser
                         upadateIndex(tx, command, location);
                     }
                 });
-            } finally {
+            }finally {
                 this.indexLock.writeLock().unlock();
             }
         }
@@ -922,8 +951,8 @@ public class MessageDatabase extends Ser
 
     protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
         if (command.hasTransactionInfo()) {
-            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
-            inflightTx.add(new RemoveOpperation(command, location));
+           List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
+           inflightTx.add(new RemoveOpperation(command, location));
         } else {
             this.indexLock.writeLock().lock();
             try {
@@ -932,7 +961,7 @@ public class MessageDatabase extends Ser
                         updateIndex(tx, command, location);
                     }
                 });
-            } finally {
+            }finally {
                 this.indexLock.writeLock().unlock();
             }
         }
@@ -947,7 +976,7 @@ public class MessageDatabase extends Ser
                     updateIndex(tx, command, location);
                 }
             });
-        } finally {
+        }finally {
             this.indexLock.writeLock().unlock();
         }
     }
@@ -960,7 +989,7 @@ public class MessageDatabase extends Ser
                     updateIndex(tx, command, location);
                 }
             });
-        } finally {
+        }finally {
             this.indexLock.writeLock().unlock();
         }
     }
@@ -984,7 +1013,7 @@ public class MessageDatabase extends Ser
     }
 
     protected void process(KahaCommitCommand command, Location location, final Runnable after) throws IOException {
-        TransactionId key = key(command.getTransactionInfo());
+        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
         List<Operation> inflightTx;
         synchronized (inflightTransactions) {
             inflightTx = inflightTransactions.remove(key);
@@ -1014,7 +1043,7 @@ public class MessageDatabase extends Ser
     }
 
     protected void process(KahaPrepareCommand command, Location location) {
-        TransactionId key = key(command.getTransactionInfo());
+        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
         synchronized (inflightTransactions) {
             List<Operation> tx = inflightTransactions.remove(key);
             if (tx != null) {
@@ -1024,7 +1053,7 @@ public class MessageDatabase extends Ser
     }
 
     protected void process(KahaRollbackCommand command, Location location) {
-        TransactionId key = key(command.getTransactionInfo());
+        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
         synchronized (inflightTransactions) {
             List<Operation> tx = inflightTransactions.remove(key);
             if (tx == null) {
@@ -1119,7 +1148,6 @@ public class MessageDatabase extends Ser
     }
 
     Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
-
     private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
         Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
         if (referenceFileIds == null) {
@@ -1172,9 +1200,9 @@ public class MessageDatabase extends Ser
         // If set then we are creating it.. otherwise we are destroying the sub
         if (command.hasSubscriptionInfo()) {
             sd.subscriptions.put(tx, subscriptionKey, command);
-            long ackLocation = NOT_ACKED;
+            long ackLocation=NOT_ACKED;
             if (!command.getRetroactive()) {
-                ackLocation = sd.orderIndex.nextMessageId - 1;
+                ackLocation = sd.orderIndex.nextMessageId-1;
             } else {
                 addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey);
             }
@@ -1191,19 +1219,19 @@ public class MessageDatabase extends Ser
      * @param tx
      * @throws IOException
      */
-    void checkpointUpdate(Transaction tx, Journal journal, boolean cleanup) throws IOException {
+    void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
         LOG.debug("Checkpoint started.");
 
         // reflect last update exclusive of current checkpoint
         Location firstTxLocation = metadata.lastUpdate;
 
         metadata.state = OPEN_STATE;
-        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(journal);
+        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
         metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
         tx.store(metadata.page, metadataMarshaller, true);
         pageFile.flush();
 
-        if (cleanup) {
+        if( cleanup ) {
 
             final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
             final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
@@ -1211,7 +1239,7 @@ public class MessageDatabase extends Ser
             LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet);
 
             // Don't GC files under replication
-            if (journalFilesBeingReplicated != null) {
+            if( journalFilesBeingReplicated!=null ) {
                 gcCandidateSet.removeAll(journalFilesBeingReplicated);
             }
 
@@ -1220,16 +1248,16 @@ public class MessageDatabase extends Ser
             }
 
             // Don't GC files after the first in progress tx
-            if (metadata.firstInProgressTransactionLocation != null) {
+            if( metadata.firstInProgressTransactionLocation!=null ) {
                 if (metadata.firstInProgressTransactionLocation.getDataFileId() < firstTxLocation.getDataFileId()) {
                     firstTxLocation = metadata.firstInProgressTransactionLocation;
                 }
             }
 
-            if (firstTxLocation != null) {
-                while (!gcCandidateSet.isEmpty()) {
+            if( firstTxLocation!=null ) {
+                while( !gcCandidateSet.isEmpty() ) {
                     Integer last = gcCandidateSet.last();
-                    if (last >= firstTxLocation.getDataFileId()) {
+                    if( last >= firstTxLocation.getDataFileId() ) {
                         gcCandidateSet.remove(last);
                     } else {
                         break;
@@ -1240,33 +1268,32 @@ public class MessageDatabase extends Ser
 
             // Go through all the destinations to see if any of them can remove GC candidates.
             for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
-                if (gcCandidateSet.isEmpty()) {
+                if( gcCandidateSet.isEmpty() ) {
                     break;
                 }
 
                 // Use a visitor to cut down the number of pages that we load
                 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
-                    int last = -1;
-
+                    int last=-1;
                     public boolean isInterestedInKeysBetween(Location first, Location second) {
-                        if (first == null) {
-                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId() + 1);
-                            if (!subset.isEmpty() && subset.last() == second.getDataFileId()) {
+                        if( first==null ) {
+                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
+                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
                                 subset.remove(second.getDataFileId());
                             }
                             return !subset.isEmpty();
-                        } else if (second == null) {
+                        } else if( second==null ) {
                             SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
-                            if (!subset.isEmpty() && subset.first() == first.getDataFileId()) {
+                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
                                 subset.remove(first.getDataFileId());
                             }
                             return !subset.isEmpty();
                         } else {
-                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId() + 1);
-                            if (!subset.isEmpty() && subset.first() == first.getDataFileId()) {
+                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
+                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
                                 subset.remove(first.getDataFileId());
                             }
-                            if (!subset.isEmpty() && subset.last() == second.getDataFileId()) {
+                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
                                 subset.remove(second.getDataFileId());
                             }
                             return !subset.isEmpty();
@@ -1276,7 +1303,7 @@ public class MessageDatabase extends Ser
                     public void visit(List<Location> keys, List<Long> values) {
                         for (Location l : keys) {
                             int fileId = l.getDataFileId();
-                            if (last != fileId) {
+                            if( last != fileId ) {
                                 gcCandidateSet.remove(fileId);
                                 last = fileId;
                             }
@@ -1310,8 +1337,8 @@ public class MessageDatabase extends Ser
                 }
             }
 
-            if (!gcCandidateSet.isEmpty()) {
-                LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
+            if( !gcCandidateSet.isEmpty() ) {
+                LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
                 journal.removeDataFiles(gcCandidateSet);
             }
         }
@@ -1319,13 +1346,13 @@ public class MessageDatabase extends Ser
         LOG.debug("Checkpoint done.");
     }
 
-    private Location checkpointProducerAudit(Journal journal) throws IOException {
+    private Location checkpointProducerAudit() throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         ObjectOutputStream oout = new ObjectOutputStream(baos);
         oout.writeObject(metadata.producerSequenceIdTracker);
         oout.flush();
         oout.close();
-        return store(journal, new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), true, null, null);
+        return store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), true, null, null);
     }
 
     public HashSet<Integer> getJournalFilesBeingReplicated() {
@@ -1351,13 +1378,13 @@ public class MessageDatabase extends Ser
         final Location location;
 
         public MessageKeys(String messageId, Location location) {
-            this.messageId = messageId;
-            this.location = location;
+            this.messageId=messageId;
+            this.location=location;
         }
 
         @Override
         public String toString() {
-            return "[" + messageId + "," + location + "]";
+            return "["+messageId+","+location+"]";
         }
     }
 
@@ -1473,20 +1500,20 @@ public class MessageDatabase extends Ser
                 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
                 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
             } else {
-                // upgrade
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                    public void execute(Transaction tx) throws IOException {
-                        value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
-                        value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-                        value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
-                        value.orderIndex.lowPriorityIndex.load(tx);
-
-                        value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
-                        value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-                        value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
-                        value.orderIndex.highPriorityIndex.load(tx);
-                    }
-                });
+                    // upgrade
+                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                        public void execute(Transaction tx) throws IOException {
+                            value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+                            value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+                            value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+                            value.orderIndex.lowPriorityIndex.load(tx);
+
+                            value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+                            value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+                            value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+                            value.orderIndex.highPriorityIndex.load(tx);
+                        }
+                    });
             }
 
             return value;
@@ -1514,12 +1541,12 @@ public class MessageDatabase extends Ser
 
         public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
             KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
-            rc.mergeFramed((InputStream) dataIn);
+            rc.mergeFramed((InputStream)dataIn);
             return rc;
         }
 
         public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
-            object.writeFramed((OutputStream) dataOut);
+            object.writeFramed((OutputStream)dataOut);
         }
     }
 
@@ -1609,7 +1636,7 @@ public class MessageDatabase extends Ser
                 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
                     Entry<String, LastAck> entry = iterator.next();
                     for (Iterator<Entry<Long, MessageKeys>> orderIterator =
-                                 rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
+                            rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
                         Long sequence = orderIterator.next().getKey();
                         addAckLocation(tx, rc, sequence, entry.getKey());
                     }
@@ -1621,18 +1648,18 @@ public class MessageDatabase extends Ser
             if (rc.orderIndex.nextMessageId == 0) {
                 // check for existing durable sub all acked out - pull next seq from acks as messages are gone
                 if (!rc.subscriptionAcks.isEmpty(tx)) {
-                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
+                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
                         Entry<String, LastAck> entry = iterator.next();
                         rc.orderIndex.nextMessageId =
-                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence + 1);
+                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
                     }
                 }
             } else {
                 // update based on ackPositions for unmatched, last entry is always the next
                 if (!rc.ackPositions.isEmpty(tx)) {
-                    Entry<Long, HashSet<String>> last = rc.ackPositions.getLast(tx);
+                    Entry<Long,HashSet<String>> last = rc.ackPositions.getLast(tx);
                     rc.orderIndex.nextMessageId =
-                            Math.max(rc.orderIndex.nextMessageId, last.getKey());
+                        Math.max(rc.orderIndex.nextMessageId, last.getKey());
                 }
             }
 
@@ -1665,17 +1692,16 @@ public class MessageDatabase extends Ser
     }
 
     final HashSet nextMessageIdMarker = new HashSet<String>();
-
     // on a new message add, all existing subs are interested in this message
     private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
         HashSet hs = new HashSet<String>();
-        for (Iterator<Entry<String, LastAck>> iterator = sd.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
+        for (Iterator<Entry<String, LastAck>> iterator = sd.subscriptionAcks.iterator(tx); iterator.hasNext();) {
             Entry<String, LastAck> entry = iterator.next();
             hs.add(entry.getKey());
         }
         sd.ackPositions.put(tx, messageSequence, hs);
         // add empty next to keep track of nextMessage
-        sd.ackPositions.put(tx, messageSequence + 1, nextMessageIdMarker);
+        sd.ackPositions.put(tx, messageSequence+1, nextMessageIdMarker);
     }
 
     private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
@@ -1729,11 +1755,11 @@ public class MessageDatabase extends Ser
     // /////////////////////////////////////////////////////////////////
     // Transaction related implementation methods.
     // /////////////////////////////////////////////////////////////////
-    protected final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
+    private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
     protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
     protected final Set<String> ackedAndPrepared = new HashSet<String>();
 
-    // messages that have prepared (pending) acks cannot be redispatched unless the outcome is rollback,
+    // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
     // till then they are skipped by the store.
     // 'at most once' XA guarantee
     public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
@@ -1761,7 +1787,7 @@ public class MessageDatabase extends Ser
     }
 
     private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
-        TransactionId key = key(info);
+        TransactionId key = TransactionIdConversion.convert(info);
         List<Operation> tx;
         synchronized (inflightTransactions) {
             tx = inflightTransactions.get(key);
@@ -1774,20 +1800,7 @@ public class MessageDatabase extends Ser
     }
 
     private TransactionId key(KahaTransactionInfo transactionInfo) {
-        if (transactionInfo.hasLocalTransacitonId()) {
-            KahaLocalTransactionId tx = transactionInfo.getLocalTransacitonId();
-            LocalTransactionId rc = new LocalTransactionId();
-            rc.setConnectionId(new ConnectionId(tx.getConnectionId()));
-            rc.setValue(tx.getTransacitonId());
-            return rc;
-        } else {
-            KahaXATransactionId tx = transactionInfo.getXaTransacitonId();
-            XATransactionId rc = new XATransactionId();
-            rc.setBranchQualifier(tx.getBranchQualifier().toByteArray());
-            rc.setGlobalTransactionId(tx.getGlobalTransactionId().toByteArray());
-            rc.setFormatId(tx.getFormatId());
-            return rc;
-        }
+        return TransactionIdConversion.convert(transactionInfo);
     }
 
     abstract class Operation {
@@ -1852,15 +1865,15 @@ public class MessageDatabase extends Ser
         return index;
     }
 
-    private JournalManager createJournalManager() throws IOException {
-        JournalManager manager = isJournalPerDestination() ? new DestinationJournalManager() : new DefaultJournalManager();
+    private Journal createJournal() throws IOException {
+        Journal manager = new Journal();
         manager.setDirectory(directory);
         manager.setMaxFileLength(getJournalMaxFileLength());
         manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
         manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
         manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
         manager.setArchiveDataLogs(isArchiveDataLogs());
-        manager.setStoreSize(storeSize);
+        manager.setSizeAccumulator(storeSize);
         if (getDirectoryArchive() != null) {
             IOHelper.mkdirs(getDirectoryArchive());
             manager.setDirectoryArchive(getDirectoryArchive());
@@ -1963,15 +1976,11 @@ public class MessageDatabase extends Ser
         return pageFile;
     }
 
-    public JournalManager getJournalManager() throws IOException {
-        if (journalManager == null) {
-            journalManager = createJournalManager();
+    public Journal getJournal() throws IOException {
+        if (journal == null) {
+            journal = createJournal();
         }
-        return journalManager;
-    }
-
-    public Journal getJournal(ActiveMQDestination destination) throws IOException {
-        return getJournalManager().getJournal(destination);
+        return journal;
     }
 
     public boolean isFailIfDatabaseIsLocked() {
@@ -2060,59 +2069,27 @@ public class MessageDatabase extends Ser
         this.databaseLockedWaitDelay = databaseLockedWaitDelay;
     }
 
-    public boolean isJournalPerDestination() {
-        return journalPerDestination;
-    }
-
-    public void setJournalPerDestination(boolean journalPerDestination) {
-        this.journalPerDestination = journalPerDestination;
-    }
-
     // /////////////////////////////////////////////////////////////////
     // Internal conversion methods.
     // /////////////////////////////////////////////////////////////////
 
-    KahaTransactionInfo createTransactionInfo(TransactionId txid) {
-        if (txid == null) {
-            return null;
-        }
-        KahaTransactionInfo rc = new KahaTransactionInfo();
-
-        if (txid.isLocalTransaction()) {
-            LocalTransactionId t = (LocalTransactionId) txid;
-            KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
-            kahaTxId.setConnectionId(t.getConnectionId().getValue());
-            kahaTxId.setTransacitonId(t.getValue());
-            rc.setLocalTransacitonId(kahaTxId);
-        } else {
-            XATransactionId t = (XATransactionId) txid;
-            KahaXATransactionId kahaTxId = new KahaXATransactionId();
-            kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
-            kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
-            kahaTxId.setFormatId(t.getFormatId());
-            rc.setXaTransacitonId(kahaTxId);
-        }
-        return rc;
-    }
-
-    class MessageOrderCursor {
+    class MessageOrderCursor{
         long defaultCursorPosition;
         long lowPriorityCursorPosition;
         long highPriorityCursorPosition;
-
-        MessageOrderCursor() {
+        MessageOrderCursor(){
         }
 
-        MessageOrderCursor(long position) {
-            this.defaultCursorPosition = position;
-            this.lowPriorityCursorPosition = position;
-            this.highPriorityCursorPosition = position;
+        MessageOrderCursor(long position){
+            this.defaultCursorPosition=position;
+            this.lowPriorityCursorPosition=position;
+            this.highPriorityCursorPosition=position;
         }
 
-        MessageOrderCursor(MessageOrderCursor other) {
-            this.defaultCursorPosition = other.defaultCursorPosition;
-            this.lowPriorityCursorPosition = other.lowPriorityCursorPosition;
-            this.highPriorityCursorPosition = other.highPriorityCursorPosition;
+        MessageOrderCursor(MessageOrderCursor other){
+            this.defaultCursorPosition=other.defaultCursorPosition;
+            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
+            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
         }
 
         MessageOrderCursor copy() {
@@ -2120,33 +2097,33 @@ public class MessageDatabase extends Ser
         }
 
         void reset() {
-            this.defaultCursorPosition = 0;
-            this.highPriorityCursorPosition = 0;
-            this.lowPriorityCursorPosition = 0;
+            this.defaultCursorPosition=0;
+            this.highPriorityCursorPosition=0;
+            this.lowPriorityCursorPosition=0;
         }
 
         void increment() {
-            if (defaultCursorPosition != 0) {
+            if (defaultCursorPosition!=0) {
                 defaultCursorPosition++;
             }
-            if (highPriorityCursorPosition != 0) {
+            if (highPriorityCursorPosition!=0) {
                 highPriorityCursorPosition++;
             }
-            if (lowPriorityCursorPosition != 0) {
+            if (lowPriorityCursorPosition!=0) {
                 lowPriorityCursorPosition++;
             }
         }
 
         public String toString() {
-            return "MessageOrderCursor:[def:" + defaultCursorPosition
-                    + ", low:" + lowPriorityCursorPosition
-                    + ", high:" + highPriorityCursorPosition + "]";
+           return "MessageOrderCursor:[def:" + defaultCursorPosition
+                   + ", low:" + lowPriorityCursorPosition
+                   + ", high:" +  highPriorityCursorPosition + "]";
         }
 
         public void sync(MessageOrderCursor other) {
-            this.defaultCursorPosition = other.defaultCursorPosition;
-            this.lowPriorityCursorPosition = other.lowPriorityCursorPosition;
-            this.highPriorityCursorPosition = other.highPriorityCursorPosition;
+            this.defaultCursorPosition=other.defaultCursorPosition;
+            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
+            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
         }
     }
 
@@ -2167,9 +2144,9 @@ public class MessageDatabase extends Ser
 
         MessageKeys remove(Transaction tx, Long key) throws IOException {
             MessageKeys result = defaultPriorityIndex.remove(tx, key);
-            if (result == null && highPriorityIndex != null) {
+            if (result == null && highPriorityIndex!=null) {
                 result = highPriorityIndex.remove(tx, key);
-                if (result == null && lowPriorityIndex != null) {
+                if (result ==null && lowPriorityIndex!=null) {
                     result = lowPriorityIndex.remove(tx, key);
                 }
             }
@@ -2292,14 +2269,14 @@ public class MessageDatabase extends Ser
         }
 
         void stoppedIterating() {
-            if (lastDefaultKey != null) {
-                cursor.defaultCursorPosition = lastDefaultKey.longValue() + 1;
+            if (lastDefaultKey!=null) {
+                cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
             }
-            if (lastHighKey != null) {
-                cursor.highPriorityCursorPosition = lastHighKey.longValue() + 1;
+            if (lastHighKey!=null) {
+                cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
             }
-            if (lastLowKey != null) {
-                cursor.lowPriorityCursorPosition = lastLowKey.longValue() + 1;
+            if (lastLowKey!=null) {
+                cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
             }
             lastDefaultKey = null;
             lastHighKey = null;
@@ -2318,7 +2295,7 @@ public class MessageDatabase extends Ser
         }
 
         void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
-                           BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
+                BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
 
             Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId);
             deletes.add(iterator.next());
@@ -2354,23 +2331,24 @@ public class MessageDatabase extends Ser
             }
         }
 
-        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException {
-            return new MessageOrderIterator(tx, cursor);
+        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
+            return new MessageOrderIterator(tx,cursor);
         }
 
-        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException {
-            return new MessageOrderIterator(tx, m);
+        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
+            return new MessageOrderIterator(tx,m);
         }
 
         public byte lastGetPriority() {
             return lastGetPriority;
         }
 
-        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>> {
-            Iterator<Entry<Long, MessageKeys>> currentIterator;
-            final Iterator<Entry<Long, MessageKeys>> highIterator;
-            final Iterator<Entry<Long, MessageKeys>> defaultIterator;
-            final Iterator<Entry<Long, MessageKeys>> lowIterator;
+        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
+            Iterator<Entry<Long, MessageKeys>>currentIterator;
+            final Iterator<Entry<Long, MessageKeys>>highIterator;
+            final Iterator<Entry<Long, MessageKeys>>defaultIterator;
+            final Iterator<Entry<Long, MessageKeys>>lowIterator;
+
 
 
             MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java?rev=1170201&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java Tue Sep 13 15:01:37 2011
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import javax.xml.bind.annotation.XmlAnyAttribute;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.filter.AnyDestination;
+import org.apache.activemq.filter.DestinationMap;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
+import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.IOHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of {@link org.apache.activemq.store.PersistenceAdapter}  that supports
+ * distribution of destinations across multiple kahaDB persistence adapters
+ *
+ * @org.apache.xbean.XBean element="mKahaDB"
+ */
+public class MultiKahaDBPersistenceAdapter extends DestinationMap implements PersistenceAdapter, BrokerServiceAware {
+    static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class);
+
+    final static ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
+    final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
+
+    BrokerService brokerService;
+    List<KahaDBPersistenceAdapter> adapters = new LinkedList<KahaDBPersistenceAdapter>();
+    private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB");
+
+    MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this);
+
+    // all local store transactions are XA, 2pc if more than one adapter involved
+    TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() {
+        @Override
+        public KahaTransactionInfo transform(TransactionId txid) {
+            if (txid == null) {
+                return null;
+            }
+            KahaTransactionInfo rc = new KahaTransactionInfo();
+            KahaXATransactionId kahaTxId = new KahaXATransactionId();
+            if (txid.isLocalTransaction()) {
+                LocalTransactionId t = (LocalTransactionId) txid;
+                kahaTxId.setBranchQualifier(new Buffer(Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"))));
+                kahaTxId.setGlobalTransactionId(new Buffer(t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"))));
+                kahaTxId.setFormatId(LOCAL_FORMAT_ID_MAGIC);
+            } else {
+                XATransactionId t = (XATransactionId) txid;
+                kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
+                kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
+                kahaTxId.setFormatId(t.getFormatId());
+            }
+            rc.setXaTransacitonId(kahaTxId);
+            return rc;
+        }
+    };
+
+    /**
+     * Sets the  FilteredKahaDBPersistenceAdapter entries
+     *
+     * @org.apache.xbean.ElementType class="org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter"
+     */
+    public void setFilteredPersistenceAdapters(List entries) {
+        for (Object entry : entries) {
+            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) entry;
+            KahaDBPersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
+            if (filteredAdapter.getDestination() == null) {
+                filteredAdapter.setDestination(matchAll);
+            }
+            adapter.setDirectory(new File(getDirectory(), nameFromDestinationFilter(filteredAdapter.getDestination())));
+
+            // need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans
+            adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
+            adapters.add(adapter);
+        }
+        super.setEntries(entries);
+    }
+
+    private String nameFromDestinationFilter(ActiveMQDestination destination) {
+        return IOHelper.toFileSystemSafeName(destination.getQualifiedName());
+    }
+
+    public boolean isLocalXid(TransactionId xid) {
+        return xid instanceof XATransactionId &&
+                ((XATransactionId)xid).getFormatId() == LOCAL_FORMAT_ID_MAGIC;
+    }
+
+    public void beginTransaction(ConnectionContext context) throws IOException {
+        throw new IllegalStateException();
+    }
+
+    public void checkpoint(final boolean sync) throws IOException {
+        for (PersistenceAdapter persistenceAdapter : adapters) {
+            persistenceAdapter.checkpoint(sync);
+        }
+    }
+
+    public void commitTransaction(ConnectionContext context) throws IOException {
+        throw new IllegalStateException();
+    }
+
+    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
+        PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
+        return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createQueueMessageStore(destination));
+    }
+
+    private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination) {
+        Object result = this.chooseValue(destination);
+        if (result == null) {
+            throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters);
+        }
+        return ((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter();
+    }
+
+    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
+        PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
+        return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination));
+    }
+
+    public TransactionStore createTransactionStore() throws IOException {
+        return transactionStore;
+    }
+
+    public void deleteAllMessages() throws IOException {
+        for (PersistenceAdapter persistenceAdapter : adapters) {
+            persistenceAdapter.deleteAllMessages();
+        }
+        transactionStore.deleteAllMessages();
+    }
+
+    public Set<ActiveMQDestination> getDestinations() {
+        Set<ActiveMQDestination> results = new HashSet<ActiveMQDestination>();
+        for (PersistenceAdapter persistenceAdapter : adapters) {
+            results.addAll(persistenceAdapter.getDestinations());
+        }
+        return results;
+    }
+
+    public long getLastMessageBrokerSequenceId() throws IOException {
+        long maxId = -1;
+        for (PersistenceAdapter persistenceAdapter : adapters) {
+            maxId = Math.max(maxId, persistenceAdapter.getLastMessageBrokerSequenceId());
+        }
+        return maxId;
+    }
+
+    public long getLastProducerSequenceId(ProducerId id) throws IOException {
+        long maxId = -1;
+        for (PersistenceAdapter persistenceAdapter : adapters) {
+            maxId = Math.max(maxId, persistenceAdapter.getLastProducerSequenceId(id));
+        }
+        return maxId;
+    }
+
+    public void removeQueueMessageStore(ActiveMQQueue destination) {
+        getMatchingPersistenceAdapter(destination).removeQueueMessageStore(destination);
+    }
+
+    public void removeTopicMessageStore(ActiveMQTopic destination) {
+        getMatchingPersistenceAdapter(destination).removeTopicMessageStore(destination);
+    }
+
+    public void rollbackTransaction(ConnectionContext context) throws IOException {
+        throw new IllegalStateException();
+    }
+
+    public void setBrokerName(String brokerName) {
+        for (PersistenceAdapter persistenceAdapter : adapters) {
+            persistenceAdapter.setBrokerName(brokerName);
+        }
+    }
+
+    public void setUsageManager(SystemUsage usageManager) {
+        for (PersistenceAdapter persistenceAdapter : adapters) {
+            persistenceAdapter.setUsageManager(usageManager);
+        }
+    }
+
+    public long size() {
+        long size = 0;
+        for (PersistenceAdapter persistenceAdapter : adapters) {
+            size += persistenceAdapter.size();
+        }
+        return size;
+    }
+
+    public void start() throws Exception {
+        for (PersistenceAdapter persistenceAdapter : adapters) {
+            persistenceAdapter.start();
+        }
+    }
+
+    public void stop() throws Exception {
+        for (PersistenceAdapter persistenceAdapter : adapters) {
+            persistenceAdapter.stop();
+        }
+    }
+
+    public File getDirectory() {
+        return this.directory;
+    }
+
+    @Override
+    public void setDirectory(File dir) {
+        this.directory = directory;
+    }
+
+    public void setBrokerService(BrokerService brokerService) {
+        for (KahaDBPersistenceAdapter persistenceAdapter : adapters) {
+            persistenceAdapter.setBrokerService(brokerService);
+        }
+        this.brokerService = brokerService;
+    }
+
+    public BrokerService getBrokerService() {
+        return brokerService;
+    }
+
+    public void setTransactionStore(MultiKahaDBTransactionStore transactionStore) {
+        this.transactionStore = transactionStore;
+    }
+
+    /**
+     * Set the max file length of the transaction journal
+     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
+     * be used
+     *
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
+     */
+    public void setJournalMaxFileLength(int maxFileLength) {
+        transactionStore.setJournalMaxFileLength(maxFileLength);
+    }
+
+    public int getJournalMaxFileLength() {
+        return transactionStore.getJournalMaxFileLength();
+    }
+
+    /**
+     * Set the max write batch size of  the transaction journal
+     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
+     * be used
+     *
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
+     */
+    public void setJournalWriteBatchSize(int journalWriteBatchSize) {
+        transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
+    }
+
+    public int getJournalMaxWriteBatchSize() {
+        return transactionStore.getJournalMaxWriteBatchSize();
+    }
+
+    @Override
+    public String toString() {
+        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
+        return "MultiKahaDBPersistenceAdapter[" + path + "]" + adapters;
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message