activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r1147149 [2/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/bugs/ test/java/org/apache/activemq/perf/ test/java/org/apache/activemq/usecases/
Date Fri, 15 Jul 2011 13:45:03 GMT
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1147149&r1=1147148&r2=1147149&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Jul 15 13:45:01 2011
@@ -18,15 +18,7 @@ package org.apache.activemq.store.kahadb
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
+import java.io.*;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,6 +28,7 @@ import java.util.concurrent.locks.Reentr
 import org.apache.activemq.ActiveMQMessageAuditNoSync;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.MessageAck;
@@ -44,27 +37,11 @@ import org.apache.activemq.command.Subsc
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
-import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
-import org.apache.activemq.store.kahadb.data.KahaDestination;
-import org.apache.activemq.store.kahadb.data.KahaEntryType;
-import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
-import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
-import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
-import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
-import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
-import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
-import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
-import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
-import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
-import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
+import org.apache.activemq.store.kahadb.data.*;
 import org.apache.activemq.util.Callback;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
-import org.apache.kahadb.util.LocationMarshaller;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.kahadb.index.BTreeIndex;
 import org.apache.kahadb.index.BTreeVisitor;
 import org.apache.kahadb.journal.DataFile;
@@ -73,16 +50,9 @@ import org.apache.kahadb.journal.Locatio
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.page.Transaction;
-import org.apache.kahadb.util.ByteSequence;
-import org.apache.kahadb.util.DataByteArrayInputStream;
-import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.LockFile;
-import org.apache.kahadb.util.LongMarshaller;
-import org.apache.kahadb.util.Marshaller;
-import org.apache.kahadb.util.Sequence;
-import org.apache.kahadb.util.SequenceSet;
-import org.apache.kahadb.util.StringMarshaller;
-import org.apache.kahadb.util.VariableMarshaller;
+import org.apache.kahadb.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
 
@@ -92,9 +62,11 @@ public class MessageDatabase extends Ser
     public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "0"));
 
     protected static final Buffer UNMATCHED;
+
     static {
         UNMATCHED = new Buffer(new byte[]{});
     }
+
     private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
     private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
 
@@ -105,7 +77,6 @@ public class MessageDatabase extends Ser
 
     static final int VERSION = 3;
 
-
     protected class Metadata {
         protected Page<Metadata> page;
         protected int state;
@@ -115,6 +86,7 @@ public class MessageDatabase extends Ser
         protected Location producerSequenceIdTrackerLocation = null;
         protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
         protected int version = VERSION;
+
         public void read(DataInput is) throws IOException {
             state = is.readInt();
             destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
@@ -137,9 +109,9 @@ public class MessageDatabase extends Ser
             } catch (EOFException expectedOnUpgrade) {
             }
             try {
-               version = is.readInt();
-            }catch (EOFException expectedOnUpgrade) {
-                version=1;
+                version = is.readInt();
+            } catch (EOFException expectedOnUpgrade) {
+                version = 1;
             }
             LOG.info("KahaDB is version " + version);
         }
@@ -185,7 +157,7 @@ public class MessageDatabase extends Ser
     }
 
     protected PageFile pageFile;
-    protected Journal journal;
+    protected JournalManager journalManager;
     protected Metadata metadata = new Metadata();
 
     protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
@@ -195,12 +167,12 @@ public class MessageDatabase extends Ser
     protected boolean deleteAllMessages;
     protected File directory = new File("KahaDB");
     protected Thread checkpointThread;
-    protected boolean enableJournalDiskSyncs=true;
+    protected boolean enableJournalDiskSyncs = true;
     protected boolean archiveDataLogs;
     protected File directoryArchive;
     protected AtomicLong storeSize = new AtomicLong(0);
-    long checkpointInterval = 5*1000;
-    long cleanupInterval = 30*1000;
+    long checkpointInterval = 5 * 1000;
+    long cleanupInterval = 30 * 1000;
     int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
     int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
     boolean enableIndexWriteAsync = false;
@@ -216,6 +188,8 @@ public class MessageDatabase extends Ser
     private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY;
     protected boolean forceRecoverIndex = false;
     private final Object checkpointThreadLock = new Object();
+    private boolean journalPerDestination = false;
+
 
     public MessageDatabase() {
     }
@@ -262,15 +236,15 @@ public class MessageDatabase extends Ser
             storedDestinations.clear();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
-                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
+                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext(); ) {
                         Entry<String, StoredDestination> entry = iterator.next();
-                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
+                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions != null);
                         storedDestinations.put(entry.getKey(), sd);
                     }
                 }
             });
             pageFile.flush();
-        }finally {
+        } finally {
             this.indexLock.writeLock().unlock();
         }
     }
@@ -297,11 +271,11 @@ public class MessageDatabase extends Ser
                             while (opened.get()) {
                                 Thread.sleep(sleepTime);
                                 long now = System.currentTimeMillis();
-                                if( now - lastCleanup >= cleanupInterval ) {
+                                if (now - lastCleanup >= cleanupInterval) {
                                     checkpointCleanup(true);
                                     lastCleanup = now;
                                     lastCheckpoint = now;
-                                } else if( now - lastCheckpoint >= checkpointInterval ) {
+                                } else if (now - lastCheckpoint >= checkpointInterval) {
                                     checkpointCleanup(false);
                                     lastCheckpoint = now;
                                 }
@@ -322,8 +296,8 @@ public class MessageDatabase extends Ser
     }
 
     public void open() throws IOException {
-        if( opened.compareAndSet(false, true) ) {
-            getJournal().start();
+        if (opened.compareAndSet(false, true)) {
+            getJournalManager().start();
             loadPageFile();
             startCheckpoint();
             recover();
@@ -375,18 +349,20 @@ public class MessageDatabase extends Ser
         try {
             lock();
             if (deleteAllMessages) {
-                getJournal().start();
-                getJournal().delete();
-                getJournal().close();
-                journal = null;
+                getJournalManager().start();
+                getJournalManager().delete();
+                getJournalManager().close();
+                journalManager = null;
                 getPageFile().delete();
                 LOG.info("Persistence store purged.");
                 deleteAllMessages = false;
             }
 
             open();
-            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
-        }finally {
+            for (Journal journal : getJournalManager().getJournals()) {
+                store(journal, new KahaTraceCommand().setMessage("LOADED " + new Date()));
+            }
+        } finally {
             this.indexLock.writeLock().unlock();
         }
 
@@ -394,32 +370,34 @@ public class MessageDatabase extends Ser
 
 
     public void close() throws IOException, InterruptedException {
-        if( opened.compareAndSet(true, false)) {
+        if (opened.compareAndSet(true, false)) {
             this.indexLock.writeLock().lock();
             try {
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
-                        checkpointUpdate(tx, true);
+                        for (Journal journal : getJournalManager().getJournals()) {
+                            checkpointUpdate(tx, journal, true);
+                        }
                     }
                 });
                 pageFile.unload();
                 metadata = new Metadata();
-            }finally {
+            } finally {
                 this.indexLock.writeLock().unlock();
             }
-            journal.close();
+            journalManager.close();
             synchronized (checkpointThreadLock) {
                 checkpointThread.join();
             }
             lockFile.unlock();
-            lockFile=null;
+            lockFile = null;
         }
     }
 
     public void unload() throws IOException, InterruptedException {
         this.indexLock.writeLock().lock();
         try {
-            if( pageFile != null && pageFile.isLoaded() ) {
+            if (pageFile != null && pageFile.isLoaded()) {
                 metadata.state = CLOSED_STATE;
                 metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
 
@@ -429,7 +407,7 @@ public class MessageDatabase extends Ser
                     }
                 });
             }
-        }finally {
+        } finally {
             this.indexLock.writeLock().unlock();
         }
         close();
@@ -444,7 +422,7 @@ public class MessageDatabase extends Ser
             }
             if (!preparedTransactions.isEmpty()) {
                 Location t = preparedTransactions.values().iterator().next().get(0).getLocation();
-                if (l==null || t.compareTo(l) <= 0) {
+                if (l == null || t.compareTo(l) <= 0) {
                     l = t;
                 }
             }
@@ -455,63 +433,65 @@ public class MessageDatabase extends Ser
     /**
      * Move all the messages that were in the journal into long term storage. We
      * just replay and do a checkpoint.
-     *
-     * @throws IOException
-     * @throws IOException
-     * @throws IllegalStateException
      */
     private void recover() throws IllegalStateException, IOException {
         this.indexLock.writeLock().lock();
         try {
+            for (Journal journal : getJournalManager().getJournals()) {
+                recover(journal);
+            }
+        } finally {
+            this.indexLock.writeLock().unlock();
+        }
+    }
 
-            long start = System.currentTimeMillis();
-            Location producerAuditPosition = recoverProducerAudit();
-            Location lastIndoubtPosition = getRecoveryPosition();
+    private void recover(final Journal journal) throws IllegalStateException, IOException {
 
-            Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
+        long start = System.currentTimeMillis();
+        Location producerAuditPosition = recoverProducerAudit(journal);
+        Location lastIndoubtPosition = getRecoveryPosition(journal);
 
-            if (recoveryPosition != null) {
-                int redoCounter = 0;
-                LOG.info("Recovering from the journal ...");
-                while (recoveryPosition != null) {
-                    JournalCommand<?> message = load(recoveryPosition);
-                    metadata.lastUpdate = recoveryPosition;
-                    process(message, recoveryPosition, lastIndoubtPosition);
-                    redoCounter++;
-                    recoveryPosition = journal.getNextLocation(recoveryPosition);
-                }
-                long end = System.currentTimeMillis();
-                LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
+        Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
+
+        if (recoveryPosition != null) {
+            int redoCounter = 0;
+            LOG.info("Recovering from the journal ...");
+            while (recoveryPosition != null) {
+                JournalCommand<?> message = load(journal, recoveryPosition);
+                metadata.lastUpdate = recoveryPosition;
+                process(message, recoveryPosition, lastIndoubtPosition);
+                redoCounter++;
+                recoveryPosition = journal.getNextLocation(recoveryPosition);
             }
+            long end = System.currentTimeMillis();
+            LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
+        }
 
-            // We may have to undo some index updates.
-            pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                public void execute(Transaction tx) throws IOException {
-                    recoverIndex(tx);
-                }
-            });
+        // We may have to undo some index updates.
+        pageFile.tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                recoverIndex(tx, journal);
+            }
+        });
 
-            // rollback any recovered inflight local transactions
-            Set<TransactionId> toRollback = new HashSet<TransactionId>();
-            synchronized (inflightTransactions) {
-                for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
-                    TransactionId id = it.next();
-                    if (id.isLocalTransaction()) {
-                        toRollback.add(id);
-                    }
-                }
-                for (TransactionId tx: toRollback) {
-                    LOG.debug("rolling back recovered indoubt local transaction " + tx);
-                    store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(tx)), false, null, null);
+        // rollback any recovered inflight local transactions
+        Set<TransactionId> toRollback = new HashSet<TransactionId>();
+        synchronized (inflightTransactions) {
+            for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
+                TransactionId id = it.next();
+                if (id.isLocalTransaction()) {
+                    toRollback.add(id);
                 }
             }
-        }finally {
-            this.indexLock.writeLock().unlock();
+            for (TransactionId tx : toRollback) {
+                LOG.debug("rolling back recovered indoubt local transaction " + tx);
+                store(journal, new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(tx)), false, null, null);
+            }
         }
     }
 
     private Location minimum(Location producerAuditPosition,
-            Location lastIndoubtPosition) {
+                             Location lastIndoubtPosition) {
         Location min = null;
         if (producerAuditPosition != null) {
             min = producerAuditPosition;
@@ -524,9 +504,9 @@ public class MessageDatabase extends Ser
         return min;
     }
 
-    private Location recoverProducerAudit() throws IOException {
+    private Location recoverProducerAudit(Journal journal) throws IOException {
         if (metadata.producerSequenceIdTrackerLocation != null) {
-            KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
+            KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(journal, metadata.producerSequenceIdTrackerLocation);
             try {
                 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
                 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
@@ -542,12 +522,12 @@ public class MessageDatabase extends Ser
         }
     }
 
-    protected void recoverIndex(Transaction tx) throws IOException {
+    protected void recoverIndex(Transaction tx, Journal journal) throws IOException {
         long start = System.currentTimeMillis();
         // It is possible index updates got applied before the journal updates..
         // in that case we need to removed references to messages that are not in the journal
         final Location lastAppendLocation = journal.getLastAppendLocation();
-        long undoCounter=0;
+        long undoCounter = 0;
 
         // Go through all the destinations to see if they have messages past the lastAppendLocation
         for (StoredDestination sd : storedDestinations.values()) {
@@ -573,7 +553,7 @@ public class MessageDatabase extends Ser
         }
 
         long end = System.currentTimeMillis();
-        if( undoCounter > 0 ) {
+        if (undoCounter > 0) {
             // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
             // should do sync writes to the journal.
             LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
@@ -589,12 +569,12 @@ public class MessageDatabase extends Ser
         for (StoredDestination sd : storedDestinations.values()) {
             // Use a visitor to cut down the number of pages that we load
             sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
-                int last=-1;
+                int last = -1;
 
                 public boolean isInterestedInKeysBetween(Location first, Location second) {
-                    if( first==null ) {
+                    if (first == null) {
                         return !ss.contains(0, second.getDataFileId());
-                    } else if( second==null ) {
+                    } else if (second == null) {
                         return true;
                     } else {
                         return !ss.contains(first.getDataFileId(), second.getDataFileId());
@@ -604,7 +584,7 @@ public class MessageDatabase extends Ser
                 public void visit(List<Location> keys, List<Long> values) {
                     for (Location l : keys) {
                         int fileId = l.getDataFileId();
-                        if( last != fileId ) {
+                        if (last != fileId) {
                             ss.add(fileId);
                             last = fileId;
                         }
@@ -614,34 +594,34 @@ public class MessageDatabase extends Ser
             });
         }
         HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
-        while( !ss.isEmpty() ) {
-            missingJournalFiles.add( (int)ss.removeFirst() );
+        while (!ss.isEmpty()) {
+            missingJournalFiles.add((int) ss.removeFirst());
         }
-        missingJournalFiles.removeAll( journal.getFileMap().keySet() );
+        missingJournalFiles.removeAll(journal.getFileMap().keySet());
 
-        if( !missingJournalFiles.isEmpty() ) {
-            LOG.info("Some journal files are missing: "+missingJournalFiles);
+        if (!missingJournalFiles.isEmpty()) {
+            LOG.info("Some journal files are missing: " + missingJournalFiles);
         }
 
         ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
         for (Integer missing : missingJournalFiles) {
-            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing,0), new Location(missing+1,0)));
+            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
         }
 
-        if ( checkForCorruptJournalFiles ) {
+        if (checkForCorruptJournalFiles) {
             Collection<DataFile> dataFiles = journal.getFileMap().values();
             for (DataFile dataFile : dataFiles) {
                 int id = dataFile.getDataFileId();
-                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id,dataFile.getLength()), new Location(id+1,0)));
+                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
                 Sequence seq = dataFile.getCorruptedBlocks().getHead();
-                while( seq!=null ) {
-                    missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast()+1)));
+                while (seq != null) {
+                    missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)));
                     seq = seq.getNext();
                 }
             }
         }
 
-        if( !missingPredicates.isEmpty() ) {
+        if (!missingPredicates.isEmpty()) {
             for (StoredDestination sd : storedDestinations.values()) {
 
                 final ArrayList<Long> matches = new ArrayList<Long>();
@@ -653,11 +633,11 @@ public class MessageDatabase extends Ser
                 });
 
                 // If somes message references are affected by the missing data files...
-                if( !matches.isEmpty() ) {
+                if (!matches.isEmpty()) {
 
                     // We either 'gracefully' recover dropping the missing messages or
                     // we error out.
-                    if( ignoreMissingJournalfiles ) {
+                    if (ignoreMissingJournalfiles) {
                         // Update the index to remove the references to the missing data
                         for (Long sequenceId : matches) {
                             MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
@@ -668,14 +648,14 @@ public class MessageDatabase extends Ser
                         }
 
                     } else {
-                        throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
+                        throw new IOException("Detected missing/corrupt journal files. " + matches.size() + " messages affected.");
                     }
                 }
             }
         }
 
         end = System.currentTimeMillis();
-        if( undoCounter > 0 ) {
+        if (undoCounter > 0) {
             // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
             // should do sync writes to the journal.
             LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
@@ -685,12 +665,12 @@ public class MessageDatabase extends Ser
     private Location nextRecoveryPosition;
     private Location lastRecoveryPosition;
 
-    public void incrementalRecover() throws IOException {
+    public void incrementalRecover(Journal journal) throws IOException {
         this.indexLock.writeLock().lock();
         try {
-            if( nextRecoveryPosition == null ) {
-                if( lastRecoveryPosition==null ) {
-                    nextRecoveryPosition = getRecoveryPosition();
+            if (nextRecoveryPosition == null) {
+                if (lastRecoveryPosition == null) {
+                    nextRecoveryPosition = getRecoveryPosition(journal);
                 } else {
                     nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
                 }
@@ -698,11 +678,11 @@ public class MessageDatabase extends Ser
             while (nextRecoveryPosition != null) {
                 lastRecoveryPosition = nextRecoveryPosition;
                 metadata.lastUpdate = lastRecoveryPosition;
-                JournalCommand<?> message = load(lastRecoveryPosition);
+                JournalCommand<?> message = load(journal, lastRecoveryPosition);
                 process(message, lastRecoveryPosition);
                 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
             }
-        }finally {
+        } finally {
             this.indexLock.writeLock().unlock();
         }
     }
@@ -711,7 +691,7 @@ public class MessageDatabase extends Ser
         return metadata.lastUpdate;
     }
 
-    private Location getRecoveryPosition() throws IOException {
+    private Location getRecoveryPosition(Journal journal) throws IOException {
 
         if (!this.forceRecoverIndex) {
 
@@ -721,7 +701,7 @@ public class MessageDatabase extends Ser
             }
 
             // Perhaps there were no transactions...
-            if( metadata.lastUpdate!=null) {
+            if (metadata.lastUpdate != null) {
                 // Start replay at the record after the last one recorded in the index file.
                 return journal.getNextLocation(metadata.lastUpdate);
             }
@@ -731,38 +711,44 @@ public class MessageDatabase extends Ser
     }
 
     protected void checkpointCleanup(final boolean cleanup) throws IOException {
+        for (Journal journal : getJournalManager().getJournals()) {
+            checkpointCleanup(journal, cleanup);
+        }
+    }
+
+    protected void checkpointCleanup(final Journal journal, final boolean cleanup) throws IOException {
         long start;
         this.indexLock.writeLock().lock();
         try {
             start = System.currentTimeMillis();
-            if( !opened.get() ) {
+            if (!opened.get()) {
                 return;
             }
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
-                    checkpointUpdate(tx, cleanup);
+                    checkpointUpdate(tx, journal, cleanup);
                 }
             });
-        }finally {
+        } finally {
             this.indexLock.writeLock().unlock();
         }
         long end = System.currentTimeMillis();
-        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
-            LOG.info("Slow KahaDB access: cleanup took "+(end-start));
+        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
+            LOG.info("Slow KahaDB access: cleanup took " + (end - start));
         }
     }
 
 
-    public void checkpoint(Callback closure) throws Exception {
+    public void checkpoint(final Journal journal, Callback closure) throws Exception {
         this.indexLock.writeLock().lock();
         try {
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
-                    checkpointUpdate(tx, false);
+                    checkpointUpdate(tx, journal, false);
                 }
             });
             closure.execute();
-        }finally {
+        } finally {
             this.indexLock.writeLock().unlock();
         }
     }
@@ -770,17 +756,18 @@ public class MessageDatabase extends Ser
     // /////////////////////////////////////////////////////////////////
     // Methods call by the broker to update and query the store.
     // /////////////////////////////////////////////////////////////////
-    public Location store(JournalCommand<?> data) throws IOException {
-        return store(data, false, null,null);
+    public Location store(Journal journal, JournalCommand<?> data) throws IOException {
+        return store(journal, data, false, null, null);
     }
 
+
     /**
      * All updated are are funneled through this method. The updates are converted
      * to a JournalMessage which is logged to the journal and then the data from
      * the JournalMessage is used to update the index just like it would be done
      * during a recovery process.
      */
-    public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
+    public Location store(final Journal journal, JournalCommand<?> data, boolean sync, Runnable before, Runnable after) throws IOException {
         if (before != null) {
             before.run();
         }
@@ -795,14 +782,14 @@ public class MessageDatabase extends Ser
             long start2 = System.currentTimeMillis();
             process(data, location);
             long end = System.currentTimeMillis();
-            if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
-                LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+            if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
+                LOG.info("Slow KahaDB access: Journal append took: " + (start2 - start) + " ms, Index update took " + (end - start2) + " ms");
             }
 
             this.indexLock.writeLock().lock();
             try {
                 metadata.lastUpdate = location;
-            }finally {
+            } finally {
                 this.indexLock.writeLock().unlock();
             }
             if (!checkpointThread.isAlive()) {
@@ -821,35 +808,27 @@ public class MessageDatabase extends Ser
 
     /**
      * Loads a previously stored JournalMessage
-     *
-     * @param location
-     * @return
-     * @throws IOException
      */
-    public JournalCommand<?> load(Location location) throws IOException {
+    public JournalCommand<?> load(Journal journal, Location location) throws IOException {
         long start = System.currentTimeMillis();
         ByteSequence data = journal.read(location);
         long end = System.currentTimeMillis();
-        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
-            LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
+        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
+            LOG.info("Slow KahaDB access: Journal read took: " + (end - start) + " ms");
         }
         DataByteArrayInputStream is = new DataByteArrayInputStream(data);
         byte readByte = is.readByte();
         KahaEntryType type = KahaEntryType.valueOf(readByte);
-        if( type == null ) {
-            throw new IOException("Could not load journal record. Invalid location: "+location);
+        if (type == null) {
+            throw new IOException("Could not load journal record. Invalid location: " + location);
         }
-        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
+        JournalCommand<?> message = (JournalCommand<?>) type.createMessage();
         message.mergeFramed(is);
         return message;
     }
 
     /**
      * do minimal recovery till we reach the last inDoubtLocation
-     * @param data
-     * @param location
-     * @param inDoubtlocation
-     * @throws IOException
      */
     void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
         if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
@@ -921,7 +900,7 @@ public class MessageDatabase extends Ser
                         upadateIndex(tx, command, location);
                     }
                 });
-            }finally {
+            } finally {
                 this.indexLock.writeLock().unlock();
             }
         }
@@ -929,8 +908,8 @@ public class MessageDatabase extends Ser
 
     protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
         if (command.hasTransactionInfo()) {
-           List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
-           inflightTx.add(new RemoveOpperation(command, location));
+            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
+            inflightTx.add(new RemoveOpperation(command, location));
         } else {
             this.indexLock.writeLock().lock();
             try {
@@ -939,7 +918,7 @@ public class MessageDatabase extends Ser
                         updateIndex(tx, command, location);
                     }
                 });
-            }finally {
+            } finally {
                 this.indexLock.writeLock().unlock();
             }
         }
@@ -954,7 +933,7 @@ public class MessageDatabase extends Ser
                     updateIndex(tx, command, location);
                 }
             });
-        }finally {
+        } finally {
             this.indexLock.writeLock().unlock();
         }
     }
@@ -967,7 +946,7 @@ public class MessageDatabase extends Ser
                     updateIndex(tx, command, location);
                 }
             });
-        }finally {
+        } finally {
             this.indexLock.writeLock().unlock();
         }
     }
@@ -995,7 +974,7 @@ public class MessageDatabase extends Ser
                     }
                 }
             });
-        }finally {
+        } finally {
             this.indexLock.writeLock().unlock();
         }
     }
@@ -1104,6 +1083,7 @@ public class MessageDatabase extends Ser
     }
 
     Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
+
     private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
         Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
         if (referenceFileIds == null) {
@@ -1156,9 +1136,9 @@ public class MessageDatabase extends Ser
         // If set then we are creating it.. otherwise we are destroying the sub
         if (command.hasSubscriptionInfo()) {
             sd.subscriptions.put(tx, subscriptionKey, command);
-            long ackLocation=NOT_ACKED;
+            long ackLocation = NOT_ACKED;
             if (!command.getRetroactive()) {
-                ackLocation = sd.orderIndex.nextMessageId-1;
+                ackLocation = sd.orderIndex.nextMessageId - 1;
             } else {
                 addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey);
             }
@@ -1175,19 +1155,19 @@ public class MessageDatabase extends Ser
      * @param tx
      * @throws IOException
      */
-    void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
+    void checkpointUpdate(Transaction tx, Journal journal, boolean cleanup) throws IOException {
         LOG.debug("Checkpoint started.");
 
         // reflect last update exclusive of current checkpoint
         Location firstTxLocation = metadata.lastUpdate;
 
         metadata.state = OPEN_STATE;
-        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
+        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(journal);
         metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
         tx.store(metadata.page, metadataMarshaller, true);
         pageFile.flush();
 
-        if( cleanup ) {
+        if (cleanup) {
 
             final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
             final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
@@ -1195,21 +1175,22 @@ public class MessageDatabase extends Ser
             LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet);
 
             // Don't GC files under replication
-            if( journalFilesBeingReplicated!=null ) {
+            if (journalFilesBeingReplicated != null) {
                 gcCandidateSet.removeAll(journalFilesBeingReplicated);
             }
 
             // Don't GC files after the first in progress tx
-            if( metadata.firstInProgressTransactionLocation!=null ) {
+            if (metadata.firstInProgressTransactionLocation != null) {
                 if (metadata.firstInProgressTransactionLocation.getDataFileId() < firstTxLocation.getDataFileId()) {
-                   firstTxLocation = metadata.firstInProgressTransactionLocation;
-                };
+                    firstTxLocation = metadata.firstInProgressTransactionLocation;
+                }
+                ;
             }
 
-            if( firstTxLocation!=null ) {
-                while( !gcCandidateSet.isEmpty() ) {
+            if (firstTxLocation != null) {
+                while (!gcCandidateSet.isEmpty()) {
                     Integer last = gcCandidateSet.last();
-                    if( last >= firstTxLocation.getDataFileId() ) {
+                    if (last >= firstTxLocation.getDataFileId()) {
                         gcCandidateSet.remove(last);
                     } else {
                         break;
@@ -1220,32 +1201,33 @@ public class MessageDatabase extends Ser
 
             // Go through all the destinations to see if any of them can remove GC candidates.
             for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
-                if( gcCandidateSet.isEmpty() ) {
+                if (gcCandidateSet.isEmpty()) {
                     break;
                 }
 
                 // Use a visitor to cut down the number of pages that we load
                 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
-                    int last=-1;
+                    int last = -1;
+
                     public boolean isInterestedInKeysBetween(Location first, Location second) {
-                        if( first==null ) {
-                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
-                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
+                        if (first == null) {
+                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId() + 1);
+                            if (!subset.isEmpty() && subset.last() == second.getDataFileId()) {
                                 subset.remove(second.getDataFileId());
                             }
                             return !subset.isEmpty();
-                        } else if( second==null ) {
+                        } else if (second == null) {
                             SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
-                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
+                            if (!subset.isEmpty() && subset.first() == first.getDataFileId()) {
                                 subset.remove(first.getDataFileId());
                             }
                             return !subset.isEmpty();
                         } else {
-                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
-                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
+                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId() + 1);
+                            if (!subset.isEmpty() && subset.first() == first.getDataFileId()) {
                                 subset.remove(first.getDataFileId());
                             }
-                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
+                            if (!subset.isEmpty() && subset.last() == second.getDataFileId()) {
                                 subset.remove(second.getDataFileId());
                             }
                             return !subset.isEmpty();
@@ -1255,7 +1237,7 @@ public class MessageDatabase extends Ser
                     public void visit(List<Location> keys, List<Long> values) {
                         for (Location l : keys) {
                             int fileId = l.getDataFileId();
-                            if( last != fileId ) {
+                            if (last != fileId) {
                                 gcCandidateSet.remove(fileId);
                                 last = fileId;
                             }
@@ -1289,8 +1271,8 @@ public class MessageDatabase extends Ser
                 }
             }
 
-            if( !gcCandidateSet.isEmpty() ) {
-                LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
+            if (!gcCandidateSet.isEmpty()) {
+                LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
                 journal.removeDataFiles(gcCandidateSet);
             }
         }
@@ -1298,13 +1280,13 @@ public class MessageDatabase extends Ser
         LOG.debug("Checkpoint done.");
     }
 
-    private Location checkpointProducerAudit() throws IOException {
+    private Location checkpointProducerAudit(Journal journal) throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         ObjectOutputStream oout = new ObjectOutputStream(baos);
         oout.writeObject(metadata.producerSequenceIdTracker);
         oout.flush();
         oout.close();
-        return store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), true, null, null);
+        return store(journal, new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), true, null, null);
     }
 
     public HashSet<Integer> getJournalFilesBeingReplicated() {
@@ -1330,13 +1312,13 @@ public class MessageDatabase extends Ser
         final Location location;
 
         public MessageKeys(String messageId, Location location) {
-            this.messageId=messageId;
-            this.location=location;
+            this.messageId = messageId;
+            this.location = location;
         }
 
         @Override
         public String toString() {
-            return "["+messageId+","+location+"]";
+            return "[" + messageId + "," + location + "]";
         }
     }
 
@@ -1452,20 +1434,20 @@ public class MessageDatabase extends Ser
                 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
                 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
             } else {
-                    // upgrade
-                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                        public void execute(Transaction tx) throws IOException {
-                            value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
-                            value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-                            value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
-                            value.orderIndex.lowPriorityIndex.load(tx);
-
-                            value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
-                            value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-                            value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
-                            value.orderIndex.highPriorityIndex.load(tx);
-                        }
-                    });
+                // upgrade
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    public void execute(Transaction tx) throws IOException {
+                        value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+                        value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+                        value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+                        value.orderIndex.lowPriorityIndex.load(tx);
+
+                        value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+                        value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+                        value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+                        value.orderIndex.highPriorityIndex.load(tx);
+                    }
+                });
             }
 
             return value;
@@ -1493,12 +1475,12 @@ public class MessageDatabase extends Ser
 
         public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
             KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
-            rc.mergeFramed((InputStream)dataIn);
+            rc.mergeFramed((InputStream) dataIn);
             return rc;
         }
 
         public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
-            object.writeFramed((OutputStream)dataOut);
+            object.writeFramed((OutputStream) dataOut);
         }
     }
 
@@ -1588,7 +1570,7 @@ public class MessageDatabase extends Ser
                 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
                     Entry<String, LastAck> entry = iterator.next();
                     for (Iterator<Entry<Long, MessageKeys>> orderIterator =
-                            rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
+                                 rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
                         Long sequence = orderIterator.next().getKey();
                         addAckLocation(tx, rc, sequence, entry.getKey());
                     }
@@ -1600,18 +1582,18 @@ public class MessageDatabase extends Ser
             if (rc.orderIndex.nextMessageId == 0) {
                 // check for existing durable sub all acked out - pull next seq from acks as messages are gone
                 if (!rc.subscriptionAcks.isEmpty(tx)) {
-                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
+                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
                         Entry<String, LastAck> entry = iterator.next();
                         rc.orderIndex.nextMessageId =
-                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
+                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence + 1);
                     }
                 }
             } else {
                 // update based on ackPositions for unmatched, last entry is always the next
                 if (!rc.ackPositions.isEmpty(tx)) {
-                    Entry<Long,HashSet<String>> last = rc.ackPositions.getLast(tx);
+                    Entry<Long, HashSet<String>> last = rc.ackPositions.getLast(tx);
                     rc.orderIndex.nextMessageId =
-                        Math.max(rc.orderIndex.nextMessageId, last.getKey());
+                            Math.max(rc.orderIndex.nextMessageId, last.getKey());
                 }
             }
 
@@ -1644,16 +1626,17 @@ public class MessageDatabase extends Ser
     }
 
     final HashSet nextMessageIdMarker = new HashSet<String>();
+
     // on a new message add, all existing subs are interested in this message
     private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
         HashSet hs = new HashSet<String>();
-        for (Iterator<Entry<String, LastAck>> iterator = sd.subscriptionAcks.iterator(tx); iterator.hasNext();) {
+        for (Iterator<Entry<String, LastAck>> iterator = sd.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
             Entry<String, LastAck> entry = iterator.next();
             hs.add(entry.getKey());
         }
         sd.ackPositions.put(tx, messageSequence, hs);
         // add empty next to keep track of nextMessage
-        sd.ackPositions.put(tx, messageSequence+1, nextMessageIdMarker);
+        sd.ackPositions.put(tx, messageSequence + 1, nextMessageIdMarker);
     }
 
     private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
@@ -1830,15 +1813,15 @@ public class MessageDatabase extends Ser
         return index;
     }
 
-    private Journal createJournal() throws IOException {
-        Journal manager = new Journal();
+    private JournalManager createJournalManager() throws IOException {
+        JournalManager manager = isJournalPerDestination() ? new DestinationJournalManager() : new DefaultJournalManager();
         manager.setDirectory(directory);
         manager.setMaxFileLength(getJournalMaxFileLength());
         manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
         manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
         manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
         manager.setArchiveDataLogs(isArchiveDataLogs());
-        manager.setSizeAccumulator(storeSize);
+        manager.setStoreSize(storeSize);
         if (getDirectoryArchive() != null) {
             IOHelper.mkdirs(getDirectoryArchive());
             manager.setDirectoryArchive(getDirectoryArchive());
@@ -1941,11 +1924,15 @@ public class MessageDatabase extends Ser
         return pageFile;
     }
 
-    public Journal getJournal() throws IOException {
-        if (journal == null) {
-            journal = createJournal();
+    public JournalManager getJournalManager() throws IOException {
+        if (journalManager == null) {
+            journalManager = createJournalManager();
         }
-        return journal;
+        return journalManager;
+    }
+
+    public Journal getJournal(ActiveMQDestination destination) throws IOException {
+        return getJournalManager().getJournal(destination);
     }
 
     public boolean isFailIfDatabaseIsLocked() {
@@ -2034,6 +2021,14 @@ public class MessageDatabase extends Ser
         this.databaseLockedWaitDelay = databaseLockedWaitDelay;
     }
 
+    public boolean isJournalPerDestination() {
+        return journalPerDestination;
+    }
+
+    public void setJournalPerDestination(boolean journalPerDestination) {
+        this.journalPerDestination = journalPerDestination;
+    }
+
     // /////////////////////////////////////////////////////////////////
     // Internal conversion methods.
     // /////////////////////////////////////////////////////////////////
@@ -2061,23 +2056,24 @@ public class MessageDatabase extends Ser
         return rc;
     }
 
-    class MessageOrderCursor{
+    class MessageOrderCursor {
         long defaultCursorPosition;
         long lowPriorityCursorPosition;
         long highPriorityCursorPosition;
-        MessageOrderCursor(){
+
+        MessageOrderCursor() {
         }
 
-        MessageOrderCursor(long position){
-            this.defaultCursorPosition=position;
-            this.lowPriorityCursorPosition=position;
-            this.highPriorityCursorPosition=position;
+        MessageOrderCursor(long position) {
+            this.defaultCursorPosition = position;
+            this.lowPriorityCursorPosition = position;
+            this.highPriorityCursorPosition = position;
         }
 
-        MessageOrderCursor(MessageOrderCursor other){
-            this.defaultCursorPosition=other.defaultCursorPosition;
-            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
-            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
+        MessageOrderCursor(MessageOrderCursor other) {
+            this.defaultCursorPosition = other.defaultCursorPosition;
+            this.lowPriorityCursorPosition = other.lowPriorityCursorPosition;
+            this.highPriorityCursorPosition = other.highPriorityCursorPosition;
         }
 
         MessageOrderCursor copy() {
@@ -2085,33 +2081,33 @@ public class MessageDatabase extends Ser
         }
 
         void reset() {
-            this.defaultCursorPosition=0;
-            this.highPriorityCursorPosition=0;
-            this.lowPriorityCursorPosition=0;
+            this.defaultCursorPosition = 0;
+            this.highPriorityCursorPosition = 0;
+            this.lowPriorityCursorPosition = 0;
         }
 
         void increment() {
-            if (defaultCursorPosition!=0) {
+            if (defaultCursorPosition != 0) {
                 defaultCursorPosition++;
             }
-            if (highPriorityCursorPosition!=0) {
+            if (highPriorityCursorPosition != 0) {
                 highPriorityCursorPosition++;
             }
-            if (lowPriorityCursorPosition!=0) {
+            if (lowPriorityCursorPosition != 0) {
                 lowPriorityCursorPosition++;
             }
         }
 
         public String toString() {
-           return "MessageOrderCursor:[def:" + defaultCursorPosition
-                   + ", low:" + lowPriorityCursorPosition
-                   + ", high:" +  highPriorityCursorPosition + "]";
+            return "MessageOrderCursor:[def:" + defaultCursorPosition
+                    + ", low:" + lowPriorityCursorPosition
+                    + ", high:" + highPriorityCursorPosition + "]";
         }
 
         public void sync(MessageOrderCursor other) {
-            this.defaultCursorPosition=other.defaultCursorPosition;
-            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
-            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
+            this.defaultCursorPosition = other.defaultCursorPosition;
+            this.lowPriorityCursorPosition = other.lowPriorityCursorPosition;
+            this.highPriorityCursorPosition = other.highPriorityCursorPosition;
         }
     }
 
@@ -2132,9 +2128,9 @@ public class MessageDatabase extends Ser
 
         MessageKeys remove(Transaction tx, Long key) throws IOException {
             MessageKeys result = defaultPriorityIndex.remove(tx, key);
-            if (result == null && highPriorityIndex!=null) {
+            if (result == null && highPriorityIndex != null) {
                 result = highPriorityIndex.remove(tx, key);
-                if (result ==null && lowPriorityIndex!=null) {
+                if (result == null && lowPriorityIndex != null) {
                     result = lowPriorityIndex.remove(tx, key);
                 }
             }
@@ -2256,14 +2252,14 @@ public class MessageDatabase extends Ser
         }
 
         void stoppedIterating() {
-            if (lastDefaultKey!=null) {
-                cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
+            if (lastDefaultKey != null) {
+                cursor.defaultCursorPosition = lastDefaultKey.longValue() + 1;
             }
-            if (lastHighKey!=null) {
-                cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
+            if (lastHighKey != null) {
+                cursor.highPriorityCursorPosition = lastHighKey.longValue() + 1;
             }
-            if (lastLowKey!=null) {
-                cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
+            if (lastLowKey != null) {
+                cursor.lowPriorityCursorPosition = lastLowKey.longValue() + 1;
             }
             lastDefaultKey = null;
             lastHighKey = null;
@@ -2282,7 +2278,7 @@ public class MessageDatabase extends Ser
         }
 
         void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
-                BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
+                           BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
 
             Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId);
             deletes.add(iterator.next());
@@ -2318,24 +2314,23 @@ public class MessageDatabase extends Ser
             }
         }
 
-        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
-            return new MessageOrderIterator(tx,cursor);
+        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException {
+            return new MessageOrderIterator(tx, cursor);
         }
 
-        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
-            return new MessageOrderIterator(tx,m);
+        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException {
+            return new MessageOrderIterator(tx, m);
         }
 
         public byte lastGetPriority() {
             return lastGetPriority;
         }
 
-        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
-            Iterator<Entry<Long, MessageKeys>>currentIterator;
-            final Iterator<Entry<Long, MessageKeys>>highIterator;
-            final Iterator<Entry<Long, MessageKeys>>defaultIterator;
-            final Iterator<Entry<Long, MessageKeys>>lowIterator;
-
+        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>> {
+            Iterator<Entry<Long, MessageKeys>> currentIterator;
+            final Iterator<Entry<Long, MessageKeys>> highIterator;
+            final Iterator<Entry<Long, MessageKeys>> defaultIterator;
+            final Iterator<Entry<Long, MessageKeys>> lowIterator;
 
 
             MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java?rev=1147149&r1=1147148&r2=1147149&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java Fri Jul 15 13:45:01 2011
@@ -27,8 +27,6 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.util.DefaultIOExceptionHandler;
 import org.junit.After;
 import org.junit.Test;
-
-
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
@@ -58,7 +56,7 @@ public class AMQ2736Test {
 
         // test hack, close the journal to ensure no further journal updates when broker stops
         // mimic kill -9 in terms of no normal shutdown sequence
-        store.getJournal().close();
+        store.getJournalManager().close();
         try {
             store.close();
         } catch (Exception expectedLotsAsJournalBorked) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java?rev=1147149&r1=1147148&r2=1147149&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java Fri Jul 15 13:45:01 2011
@@ -16,11 +16,7 @@
  */
 package org.apache.activemq.bugs;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
 import java.io.IOException;
-import java.net.URISyntaxException;
 import java.util.concurrent.CountDownLatch;
 
 import javax.jms.BytesMessage;
@@ -32,7 +28,6 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.RedeliveryPolicy;
 import org.apache.activemq.broker.BrokerService;
@@ -42,6 +37,8 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 public class AMQ2982Test {
 
@@ -65,7 +62,7 @@ public class AMQ2982Test {
             // ensure save memory publishing, use the right lock
             indexLock.readLock().lock();
             try {
-                return getJournal().getFileMap().size();
+                return getJournalManager().getFileMap().size();
             } finally {
                 indexLock.readLock().unlock();
             }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java?rev=1147149&r1=1147148&r2=1147149&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java Fri Jul 15 13:45:01 2011
@@ -16,10 +16,6 @@
  */
 package org.apache.activemq.bugs;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -34,7 +30,6 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.store.kahadb.KahaDBStore;
@@ -42,6 +37,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import static org.junit.Assert.*;
 
 public class AMQ2983Test {
 
@@ -67,7 +63,7 @@ public class AMQ2983Test {
             // ensure save memory publishing, use the right lock
             indexLock.readLock().lock();
             try {
-                return getJournal().getFileMap().size();
+                return getJournalManager().getFileMap().size();
             } finally {
                 indexLock.readLock().unlock();
             }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java?rev=1147149&r1=1147148&r2=1147149&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java Fri Jul 15 13:45:01 2011
@@ -31,9 +31,9 @@ public class SimpleDurableTopicTest exte
     protected long initialConsumerDelay = 0;
     @Override
     protected void setUp() throws Exception {
-        numberOfDestinations=1;
+        numberOfDestinations=10;
         numberOfConsumers = 1;
-        numberofProducers = Integer.parseInt(System.getProperty("SimpleDurableTopicTest.numberofProducers", "20"), 20);
+        numberofProducers = Integer.parseInt(System.getProperty("SimpleDurableTopicTest.numberofProducers", "1"));
         sampleCount= Integer.parseInt(System.getProperty("SimpleDurableTopicTest.sampleCount", "1000"), 10);
         playloadSize = 1024;
         super.setUp();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1147149&r1=1147148&r2=1147149&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Fri Jul 15 13:45:01 2011
@@ -17,6 +17,7 @@
 package org.apache.activemq.usecases;
 
 import java.util.Vector;
+
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -66,7 +67,7 @@ public class DurableSubscriptionOfflineT
     public static Test suite() {
         return suite(DurableSubscriptionOfflineTest.class);
     }
-    
+
     protected void setUp() throws Exception {
         exceptions.clear();
         topic = (ActiveMQTopic) createDestination();
@@ -82,9 +83,9 @@ public class DurableSubscriptionOfflineT
     private void createBroker() throws Exception {
         createBroker(true);
     }
-    
+
     private void createBroker(boolean deleteAllMessages) throws Exception {
-        broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")");
+        broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) + ")");
         broker.setBrokerName(getName(true));
         broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
         broker.getManagementContext().setCreateConnector(false);
@@ -96,14 +97,14 @@ public class DurableSubscriptionOfflineT
             policyMap.setDefaultEntry(policy);
             broker.setDestinationPolicy(policyMap);
         }
-        
+
         setDefaultPersistenceAdapter(broker);
         if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
             // ensure it kicks in during tests
-            ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000);
+            ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).setCleanupPeriod(2 * 1000);
         } else if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
             // have lots of journal files
-            ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength);
+            ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength);
         }
         broker.start();
     }
@@ -115,9 +116,9 @@ public class DurableSubscriptionOfflineT
 
     public void initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
         this.addCombinationValues("usePrioritySupport",
-                new Object[]{ Boolean.TRUE, Boolean.FALSE});
+                new Object[]{Boolean.TRUE, Boolean.FALSE});
     }
 
     public void testConsumeOnlyMatchedMessages() throws Exception {
@@ -162,110 +163,110 @@ public class DurableSubscriptionOfflineT
         assertEquals(sent, listener.count);
     }
 
-     public void testConsumeAllMatchedMessages() throws Exception {
-         // create durable subscription
-         Connection con = createConnection();
-         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-         session.close();
-         con.close();
-
-         // send messages
-         con = createConnection();
-         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer producer = session.createProducer(null);
-
-         int sent = 0;
-         for (int i = 0; i < 10; i++) {
-             sent++;
-             Message message = session.createMessage();
-             message.setStringProperty("filter", "true");
-             producer.send(topic, message);
-         }
-
-         Thread.sleep(1 * 1000);
-
-         session.close();
-         con.close();
-
-         // consume messages
-         con = createConnection();
-         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-         Listener listener = new Listener();
-         consumer.setMessageListener(listener);
-
-         Thread.sleep(3 * 1000);
+    public void testConsumeAllMatchedMessages() throws Exception {
+        // create durable subscription
+        Connection con = createConnection();
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
 
-         session.close();
-         con.close();
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "true");
+            producer.send(topic, message);
+        }
+
+        Thread.sleep(1 * 1000);
+
+        session.close();
+        con.close();
+
+        // consume messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        Listener listener = new Listener();
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        assertEquals(sent, listener.count);
+    }
 
-         assertEquals(sent, listener.count);
-     }
 
-    
     public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
-               new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
         this.addCombinationValues("usePrioritySupport",
-                new Object[]{ Boolean.TRUE, Boolean.FALSE});
+                new Object[]{Boolean.TRUE, Boolean.FALSE});
     }
 
-     public void testVerifyAllConsumedAreAcked() throws Exception {
-         // create durable subscription
-         Connection con = createConnection();
-         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-         session.close();
-         con.close();
-
-         // send messages
-         con = createConnection();
-         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer producer = session.createProducer(null);
-
-         int sent = 0;
-         for (int i = 0; i < 10; i++) {
-             sent++;
-             Message message = session.createMessage();
-             message.setStringProperty("filter", "true");
-             producer.send(topic, message);
-         }
-
-         Thread.sleep(1 * 1000);
-
-         session.close();
-         con.close();
-
-         // consume messages
-         con = createConnection();
-         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-         Listener listener = new Listener();
-         consumer.setMessageListener(listener);
-
-         Thread.sleep(3 * 1000);
-
-         session.close();
-         con.close();
-
-         LOG.info("Consumed: " + listener.count);
-         assertEquals(sent, listener.count);
-
-         // consume messages again, should not get any
-         con = createConnection();
-         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-         listener = new Listener();
-         consumer.setMessageListener(listener);
+    public void testVerifyAllConsumedAreAcked() throws Exception {
+        // create durable subscription
+        Connection con = createConnection();
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "true");
+            producer.send(topic, message);
+        }
 
-         Thread.sleep(3 * 1000);
+        Thread.sleep(1 * 1000);
 
-         session.close();
-         con.close();
+        session.close();
+        con.close();
 
-         assertEquals(0, listener.count);
-     }
+        // consume messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        Listener listener = new Listener();
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        LOG.info("Consumed: " + listener.count);
+        assertEquals(sent, listener.count);
+
+        // consume messages again, should not get any
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        listener = new Listener();
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        assertEquals(0, listener.count);
+    }
 
     public void testTwoOfflineSubscriptionCanConsume() throws Exception {
         // create durable subscription 1
@@ -323,9 +324,9 @@ public class DurableSubscriptionOfflineT
 
     public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
         this.addCombinationValues("usePrioritySupport",
-                new Object[]{ Boolean.TRUE, Boolean.FALSE});
+                new Object[]{Boolean.TRUE, Boolean.FALSE});
     }
 
     public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
@@ -474,14 +475,15 @@ public class DurableSubscriptionOfflineT
         con.close();
 
         assertEquals("offline consumer got all", sent, listener.count);
-    }    
+    }
 
     public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
     }
 
     private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
+
     public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
         // create offline subs 1
         Connection con = createConnection("offCli1");
@@ -629,9 +631,9 @@ public class DurableSubscriptionOfflineT
 
     public void initCombosForTestOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
     }
-    
+
     public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
         // create offline subs 1
         Connection con = createConnection("offCli1");
@@ -672,7 +674,7 @@ public class DurableSubscriptionOfflineT
         Thread.sleep(3 * 1000);
         broker.stop();
         createBroker(false /*deleteAllMessages*/);
- 
+
         // send more messages
         con = createConnection();
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -719,7 +721,7 @@ public class DurableSubscriptionOfflineT
 
     public void initCombosForTestOfflineAfterRestart() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
     }
 
     public void testOfflineSubscriptionAfterRestart() throws Exception {
@@ -855,7 +857,7 @@ public class DurableSubscriptionOfflineT
 
         int filtered = 0;
         for (int i = 0; i < 10; i++) {
-            boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1;
+            boolean filter = (i % 2 == 0); //(int) (Math.random() * 2) >= 1;
             if (filter)
                 filtered++;
 
@@ -953,7 +955,7 @@ public class DurableSubscriptionOfflineT
         sent = 0;
         for (int i = 0; i < 2; i++) {
             Message message = session.createMessage();
-            message.setStringProperty("filter", i==1 ? "true" : "false");
+            message.setStringProperty("filter", i == 1 ? "true" : "false");
             producer.send(topic, message);
             sent++;
         }
@@ -961,7 +963,7 @@ public class DurableSubscriptionOfflineT
         Thread.sleep(1 * 1000);
         session.close();
         con.close();
- 
+
         LOG.info("cli1 again, should get 1 new ones");
         con = createConnection("cli1");
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -1059,7 +1061,7 @@ public class DurableSubscriptionOfflineT
     // use very small journal to get lots of files to cleanup
     public void initCombosForTestCleanupDeletedSubAfterRestart() throws Exception {
         this.addCombinationValues("journalMaxFileLength",
-                new Object[]{new Integer(64*1024)});
+                new Object[]{new Integer(64 * 1024)});
     }
 
     // https://issues.apache.org/jira/browse/AMQ-3206
@@ -1081,7 +1083,7 @@ public class DurableSubscriptionOfflineT
         MessageProducer producer = session.createProducer(null);
 
         final int toSend = 500;
-        final String payload = new byte[40*1024].toString();
+        final String payload = new byte[40 * 1024].toString();
         int sent = 0;
         for (int i = sent; i < toSend; i++) {
             Message message = session.createTextMessage(payload);
@@ -1108,7 +1110,7 @@ public class DurableSubscriptionOfflineT
         consumer.setMessageListener(listener);
         assertTrue("got all sent", Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
-                LOG.info("Want: " + toSend  + ", current: " + listener.count);
+                LOG.info("Want: " + toSend + ", current: " + listener.count);
                 return listener.count == toSend;
             }
         }));
@@ -1118,7 +1120,7 @@ public class DurableSubscriptionOfflineT
         destroyBroker();
         createBroker(false);
         KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
-        assertEquals("only one journal file left after restart", 1, pa.getStore().getJournal().getFileMap().size());
+        assertEquals("only one journal file left after restart", 1, pa.getStore().getJournalManager().getFileMap().size());
     }
 
     public static class Listener implements MessageListener {
@@ -1127,20 +1129,23 @@ public class DurableSubscriptionOfflineT
 
         Listener() {
         }
+
         Listener(String id) {
             this.id = id;
         }
+
         public void onMessage(Message message) {
             count++;
             if (id != null) {
                 try {
                     LOG.info(id + ", " + message.getJMSMessageID());
-                } catch (Exception ignored) {}
+                } catch (Exception ignored) {
+                }
             }
         }
     }
 
-    public class FilterCheckListener extends Listener  {
+    public class FilterCheckListener extends Listener {
 
         public void onMessage(Message message) {
             count++;
@@ -1150,13 +1155,11 @@ public class DurableSubscriptionOfflineT
                 if (b != null) {
                     boolean c = message.getBooleanProperty("$c");
                     assertTrue("", c);
-                }
-                else {
+                } else {
                     String d = message.getStringProperty("$d");
                     assertTrue("", "D1".equals(d) || "D2".equals(d));
                 }
-            }
-            catch (JMSException e) {
+            } catch (JMSException e) {
                 exceptions.add(e);
             }
         }



Mime
View raw message