activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6203
Date Mon, 14 Mar 2016 15:05:18 GMT
Repository: activemq
Updated Branches:
  refs/heads/master b39ab7867 -> 946e62d70


https://issues.apache.org/jira/browse/AMQ-6203

Rewrite older acks that can be preventing GC of log files.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/946e62d7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/946e62d7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/946e62d7

Branch: refs/heads/master
Commit: 946e62d702d2bf5fbcdb0ed4cb6977046acb659b
Parents: b39ab78
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Mar 14 11:04:57 2016 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Mar 14 11:04:57 2016 -0400

----------------------------------------------------------------------
 .../apache/activemq/util/ThreadPoolUtils.java   |  40 ++-
 .../activemq/store/kahadb/MessageDatabase.java  | 345 +++++++++++++++----
 .../apache/activemq/store/kahadb/Visitor.java   |   4 +
 .../store/kahadb/disk/journal/DataFile.java     |  23 +-
 .../kahadb/disk/journal/DataFileAccessor.java   |   5 +-
 .../kahadb/disk/journal/DataFileAppender.java   |  32 +-
 .../store/kahadb/disk/journal/FileAppender.java |   6 +-
 .../store/kahadb/disk/journal/Journal.java      | 124 +++++--
 .../disk/journal/TargetedDataFileAppender.java  | 297 ++++++++++++++++
 .../disk/util/DataByteArrayInputStream.java     |  31 +-
 .../disk/util/DataByteArrayOutputStream.java    |  24 +-
 .../src/main/proto/journal-data.proto           |  12 +
 .../journal/TargetedDataFileAppenderTest.java   | 116 +++++++
 .../TransactedStoreUsageSuspendResumeTest.java  |   4 +-
 14 files changed, 895 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/946e62d7/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java b/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
index 27b69fc..554f730 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
@@ -82,7 +82,9 @@ public final class ThreadPoolUtils {
      * {@link #shutdownNow(java.util.concurrent.ExecutorService)} which
      * forces a shutdown. The parameter <tt>shutdownAwaitTermination</tt>
      * is used as timeout value waiting for orderly shutdown to
-     * complete normally, before going aggressively.
+     * complete normally, before going aggressively.  If the shutdownAwaitTermination
+     * value is negative the shutdown waits indefinitely for the ExecutorService
+     * to complete its shutdown.
      *
      * @param executorService the executor service to shutdown
      * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown
@@ -130,6 +132,19 @@ public final class ThreadPoolUtils {
                         Thread.currentThread().interrupt();
                     }
                 }
+            } else  if (shutdownAwaitTermination < 0) {
+                try {
+                    awaitTermination(executorService);
+                } catch (InterruptedException e) {
+                    warned = true;
+                    LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
+                    // we were interrupted during shutdown, so force shutdown
+                    try {
+                        executorService.shutdownNow();
+                    } finally {
+                        Thread.currentThread().interrupt();
+                    }
+                }
             }
 
             // if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log
@@ -144,6 +159,29 @@ public final class ThreadPoolUtils {
     }
 
     /**
+     * Awaits the termination of the thread pool indefinitely (Use with Caution).
+     * <p/>
+     * This implementation will log every 2nd second at INFO level that we are waiting, so the end user
+     * can see we are not hanging in case it takes longer time to terminate the pool.
+     *
+     * @param executorService            the thread pool
+     *
+     * @throws InterruptedException is thrown if we are interrupted during the waiting
+     */
+    public static void awaitTermination(ExecutorService executorService) throws InterruptedException {
+        // log progress every 5th second so end user is aware of we are shutting down
+        StopWatch watch = new StopWatch();
+        final long interval = 2000;
+        while (true) {
+            if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) {
+                return;
+            } else {
+                LOG.info("Waited {} for ExecutorService: {} to terminate...", TimeUtils.printDuration(watch.taken()), executorService);
+            }
+        }
+    }
+
+    /**
      * Awaits the termination of the thread pool.
      * <p/>
      * This implementation will log every 2nd second at INFO level that we are waiting, so the end user

http://git-wip-us.apache.org/repos/asf/activemq/blob/946e62d7/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 931a18b..434e49c 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -48,6 +48,10 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -73,6 +77,7 @@ import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
 import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
+import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand;
 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
 import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
@@ -84,6 +89,7 @@ import org.apache.activemq.store.kahadb.disk.index.ListIndex;
 import org.apache.activemq.store.kahadb.disk.journal.DataFile;
 import org.apache.activemq.store.kahadb.disk.journal.Journal;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender;
 import org.apache.activemq.store.kahadb.disk.page.Page;
 import org.apache.activemq.store.kahadb.disk.page.PageFile;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
@@ -97,9 +103,11 @@ import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.DataByteArrayInputStream;
 import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.util.ThreadPoolUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,6 +130,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
 
     static final int VERSION = 6;
 
+    static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1;
+
     protected class Metadata {
         protected Page<Metadata> page;
         protected int state;
@@ -234,8 +244,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     protected boolean deleteAllMessages;
     protected File directory = DEFAULT_DIRECTORY;
     protected File indexDirectory = null;
-    protected Thread checkpointThread;
-    protected boolean enableJournalDiskSyncs=true;
+    protected ScheduledExecutorService scheduler;
+    private final Object schedulerLock = new Object();
+
+    protected boolean enableJournalDiskSyncs = true;
     protected boolean archiveDataLogs;
     protected File directoryArchive;
     protected AtomicLong journalSize = new AtomicLong(0);
@@ -254,7 +266,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     private boolean checkForCorruptJournalFiles = false;
     private boolean checksumJournalFiles = true;
     protected boolean forceRecoverIndex = false;
-    private final Object checkpointThreadLock = new Object();
     private boolean archiveCorruptedIndex = false;
     private boolean useIndexLFRUEviction = false;
     private float indexLFUEvictionFactor = 0.2f;
@@ -263,6 +274,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     private boolean enableIndexPageCaching = true;
     ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
 
+    private int compactAcksAfterNoGC = 10;
+    private boolean compactAcksIgnoresStoreGrowth = false;
+    private int checkPointCyclesWithNoGC;
+    private int journalLogOnLastCompactionCheck;
+
     @Override
     public void doStart() throws Exception {
         load();
@@ -330,51 +346,59 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     }
 
     private void startCheckpoint() {
-        if (checkpointInterval == 0 &&  cleanupInterval == 0) {
+        if (checkpointInterval == 0 && cleanupInterval == 0) {
             LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
             return;
         }
-        synchronized (checkpointThreadLock) {
-            boolean start = false;
-            if (checkpointThread == null) {
-                start = true;
-            } else if (!checkpointThread.isAlive()) {
-                start = true;
-                LOG.info("KahaDB: Recovering checkpoint thread after death");
-            }
-            if (start) {
-                checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+        synchronized (schedulerLock) {
+            if (scheduler == null) {
+                scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+
                     @Override
-                    public void run() {
-                        try {
-                            long lastCleanup = System.currentTimeMillis();
-                            long lastCheckpoint = System.currentTimeMillis();
-                            // Sleep for a short time so we can periodically check
-                            // to see if we need to exit this thread.
-                            long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
-                            while (opened.get()) {
-                                Thread.sleep(sleepTime);
-                                long now = System.currentTimeMillis();
-                                if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
-                                    checkpointCleanup(true);
-                                    lastCleanup = now;
-                                    lastCheckpoint = now;
-                                } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
-                                    checkpointCleanup(false);
-                                    lastCheckpoint = now;
-                                }
-                            }
-                        } catch (InterruptedException e) {
-                            // Looks like someone really wants us to exit this thread...
-                        } catch (IOException ioe) {
-                            LOG.error("Checkpoint failed", ioe);
-                            brokerService.handleIOException(ioe);
-                        }
+                    public Thread newThread(Runnable r) {
+                        Thread schedulerThread = new Thread(r);
+
+                        schedulerThread.setName("ActiveMQ Journal Checkpoint Worker");
+                        schedulerThread.setDaemon(true);
+
+                        return schedulerThread;
                     }
-                };
+                });
 
-                checkpointThread.setDaemon(true);
-                checkpointThread.start();
+                // Short intervals for check-point and cleanups
+                long delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
+
+                scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
+    private final class CheckpointRunner implements Runnable {
+
+        private long lastCheckpoint = System.currentTimeMillis();
+        private long lastCleanup = System.currentTimeMillis();
+
+        @Override
+        public void run() {
+            try {
+                // Decide on cleanup vs full checkpoint here.
+                if (opened.get()) {
+                    long now = System.currentTimeMillis();
+                    if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) {
+                        checkpointCleanup(true);
+                        lastCleanup = now;
+                        lastCheckpoint = now;
+                    } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) {
+                        checkpointCleanup(false);
+                        lastCheckpoint = now;
+                    }
+                }
+            } catch (IOException ioe) {
+                LOG.error("Checkpoint failed", ioe);
+                brokerService.handleIOException(ioe);
+            } catch (Throwable e) {
+                LOG.error("Checkpoint failed", e);
+                brokerService.handleIOException(IOExceptionSupport.create(e));
             }
         }
     }
@@ -444,12 +468,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 checkpointLock.writeLock().unlock();
             }
             journal.close();
-            synchronized (checkpointThreadLock) {
-                if (checkpointThread != null) {
-                    checkpointThread.join();
-                }
-            }
-            //clear the cache and journalSize on shutdown of the store
+            ThreadPoolUtils.shutdownGraceful(scheduler, -1);
+            // clear the cache and journalSize on shutdown of the store
             storeCache.clear();
             journalSize.set(0);
         }
@@ -503,11 +523,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     @SuppressWarnings("rawtypes")
     private void trackMaxAndMin(Location[] range, List<Operation> ops) {
         Location t = ops.get(0).getLocation();
-        if (range[0]==null || t.compareTo(range[0]) <= 0) {
+        if (range[0] == null || t.compareTo(range[0]) <= 0) {
             range[0] = t;
         }
         t = ops.get(ops.size() -1).getLocation();
-        if (range[1]==null || t.compareTo(range[1]) >= 0) {
+        if (range[1] == null || t.compareTo(range[1]) >= 0) {
             range[1] = t;
         }
     }
@@ -776,7 +796,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             }
         }
 
-        if( undoCounter > 0 ) {
+        if (undoCounter > 0) {
             // The rolledback operations are basically in flight journal writes.  To avoid getting
             // these the end user should do sync writes to the journal.
             if (LOG.isInfoEnabled()) {
@@ -909,7 +929,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             }
         }
 
-        if( undoCounter > 0 ) {
+        if (undoCounter > 0) {
             // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
             // should do sync writes to the journal.
             if (LOG.isInfoEnabled()) {
@@ -1019,31 +1039,31 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
         try {
             ByteSequence sequence = toByteSequence(data);
-
             Location location;
+
             checkpointLock.readLock().lock();
             try {
 
                 long start = System.currentTimeMillis();
-                location = onJournalStoreComplete == null ? journal.write(sequence, sync) :  journal.write(sequence, onJournalStoreComplete) ;
+                location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
                 long start2 = System.currentTimeMillis();
                 process(data, location, before);
 
                 long end = System.currentTimeMillis();
-                if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+                if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
                     if (LOG.isInfoEnabled()) {
                         LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
                     }
                 }
-
-            } finally{
+            } finally {
                 checkpointLock.readLock().unlock();
             }
+
             if (after != null) {
                 after.run();
             }
 
-            if (checkpointThread != null && !checkpointThread.isAlive() && opened.get()) {
+            if (scheduler == null && opened.get()) {
                 startCheckpoint();
             }
             return location;
@@ -1167,6 +1187,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             public void visit(KahaUpdateMessageCommand command) throws IOException {
                 process(command, location);
             }
+
+            @Override
+            public void visit(KahaRewrittenDataFileCommand command) throws IOException {
+                process(command, location);
+            }
         });
     }
 
@@ -1323,6 +1348,19 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         }
     }
 
+    protected void process(KahaRewrittenDataFileCommand command, Location location)  throws IOException {
+        final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
+        if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
+            // Mark the current journal file as a compacted file so that gc checks can skip
+            // over logs that are smaller compaction type logs.
+            DataFile current = journal.getDataFileById(location.getDataFileId());
+            current.setTypeCode(command.getRewriteType());
+
+            // Move offset so that next location read jumps to next file.
+            location.setOffset(journalMaxFileLength);
+        }
+    }
+
     // /////////////////////////////////////////////////////////////////
     // These methods do the actual index updates.
     // /////////////////////////////////////////////////////////////////
@@ -1595,7 +1633,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         tx.store(metadata.page, metadataMarshaller, true);
         pageFile.flush();
 
-        if( cleanup ) {
+        if (cleanup) {
 
             final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
             final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
@@ -1743,6 +1781,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 LOG.trace("gc candidates: " + gcCandidateSet);
                 LOG.trace("ackMessageFileMap: " +  metadata.ackMessageFileMap);
             }
+
             boolean ackMessageFileMapMod = false;
             Iterator<Integer> candidates = gcCandidateSet.iterator();
             while (candidates.hasNext()) {
@@ -1768,9 +1807,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             }
 
             if (!gcCandidateSet.isEmpty()) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
-                }
+                LOG.debug("Cleanup removing the data files: {}", gcCandidateSet);
                 journal.removeDataFiles(gcCandidateSet);
                 for (Integer candidate : gcCandidateSet) {
                     for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
@@ -1780,12 +1817,153 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 if (ackMessageFileMapMod) {
                     checkpointUpdate(tx, false);
                 }
+            } else {
+                if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) {
+                    // First check length of journal to make sure it makes sense to even try.
+                    //
+                    // If there is only one journal file with Acks in it we don't need to move
+                    // it since it won't be chained to any later logs.
+                    //
+                    // If the logs haven't grown since the last time then we need to compact
+                    // otherwise there seems to still be room for growth and we don't need to incur
+                    // the overhead.  Depending on configuration this check can be avoided and
+                    // Ack compaction will run any time the store has not GC'd a journal file in
+                    // the configured amount of cycles.
+                    if (metadata.ackMessageFileMap.size() > 1 &&
+                        (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) {
+
+                        LOG.trace("No files GC'd checking if threshold to ACK compaction has been met.");
+                        try {
+                            scheduler.execute(new AckCompactionRunner());
+                        } catch (Exception ex) {
+                            LOG.warn("Error on queueing the Ack Compactor", ex);
+                        }
+                    } else {
+                        LOG.trace("Journal activity detected, no Ack compaction scheduled.");
+                    }
+
+                    checkPointCyclesWithNoGC = 0;
+                } else {
+                    LOG.trace("Not yet time to check for compaction: {} of {} cycles",
+                              checkPointCyclesWithNoGC, getCompactAcksAfterNoGC());
+                }
+
+                journalLogOnLastCompactionCheck = journal.getCurrentDataFileId();
             }
         }
 
         LOG.debug("Checkpoint done.");
     }
 
+    private final class AckCompactionRunner implements Runnable {
+
+        @Override
+        public void run() {
+            // Lock index to capture the ackMessageFileMap data
+            indexLock.writeLock().lock();
+
+            // Map keys might not be sorted, find the earliest log file to forward acks
+            // from and move only those, future cycles can chip away at more as needed.
+            // We won't move files that are themselves rewritten on a previous compaction.
+            List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
+            Collections.sort(journalFileIds);
+            int journalToAdvance = -1;
+            for (Integer journalFileId : journalFileIds) {
+                DataFile current = journal.getDataFileById(journalFileId);
+                if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
+                    journalToAdvance = journalFileId;
+                    break;
+                }
+            }
+
+            // Check if we found one, or if we only found the current file being written to.
+            if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
+                return;
+            }
+
+            Set<Integer> journalLogsReferenced =
+                new HashSet<Integer>(metadata.ackMessageFileMap.get(journalToAdvance));
+
+            indexLock.writeLock().unlock();
+
+            try {
+                // Background rewrite of the old acks
+                forwardAllAcks(journalToAdvance, journalLogsReferenced);
+
+                // Checkpoint with changes from the ackMessageFileMap
+                checkpointUpdate(false);
+            } catch (IOException ioe) {
+                LOG.error("Checkpoint failed", ioe);
+                brokerService.handleIOException(ioe);
+            } catch (Throwable e) {
+                LOG.error("Checkpoint failed", e);
+                brokerService.handleIOException(IOExceptionSupport.create(e));
+            }
+        }
+    }
+
+    private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException {
+        LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead);
+
+        DataFile forwardsFile = journal.reserveDataFile();
+        LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile);
+
+        Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>();
+
+        try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) {
+            KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand();
+            compactionMarker.setSourceDataFileId(journalToRead);
+            compactionMarker.setRewriteType(COMPACTED_JOURNAL_FILE);
+
+            ByteSequence payload = toByteSequence(compactionMarker);
+            appender.storeItem(payload, Journal.USER_RECORD_TYPE, isEnableJournalDiskSyncs());
+            LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead);
+
+            Location nextLocation = journal.getNextLocation(new Location(journalToRead, 0));
+            while (nextLocation != null && nextLocation.getDataFileId() == journalToRead) {
+                JournalCommand<?> command = null;
+                try {
+                    command = load(nextLocation);
+                } catch (IOException ex) {
+                    LOG.trace("Error loading command during ack forward: {}", nextLocation);
+                }
+
+                if (command != null && command instanceof KahaRemoveMessageCommand) {
+                    payload = toByteSequence(command);
+                    Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, isEnableJournalDiskSyncs());
+                    updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced);
+                }
+
+                nextLocation = journal.getNextLocation(nextLocation);
+            }
+        }
+
+        LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations);
+
+        // Lock index while we update the ackMessageFileMap.
+        indexLock.writeLock().lock();
+
+        // Update the ack map with the new locations of the acks
+        for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) {
+            Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey());
+            if (referenceFileIds == null) {
+                referenceFileIds = new HashSet<Integer>();
+                referenceFileIds.addAll(entry.getValue());
+                metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds);
+            } else {
+                referenceFileIds.addAll(entry.getValue());
+            }
+        }
+
+        // remove the old location data from the ack map so that the old journal log file can
+        // be removed on next GC.
+        metadata.ackMessageFileMap.remove(journalToRead);
+
+        indexLock.writeLock().unlock();
+
+        LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
+    }
+
     final Runnable nullCompletionCallback = new Runnable() {
         @Override
         public void run() {
@@ -1943,7 +2121,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         }
     }
 
-
     class StoredDestination {
 
         MessageOrderIndex orderIndex = new MessageOrderIndex();
@@ -2708,7 +2885,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 runWithIndexLock.sequenceAssignedWithIndexLocked(seq);
             }
         }
-
     }
 
     class RemoveOperation extends Operation<KahaRemoveMessageCommand> {
@@ -2728,7 +2904,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     // /////////////////////////////////////////////////////////////////
 
     private PageFile createPageFile() throws IOException {
-        if( indexDirectory == null ) {
+        if (indexDirectory == null) {
             indexDirectory = directory;
         }
         IOHelper.mkdirs(indexDirectory);
@@ -3456,4 +3632,43 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     public void setPreallocationStrategy(String preallocationStrategy) {
         this.preallocationStrategy = preallocationStrategy;
     }
+
+    public int getCompactAcksAfterNoGC() {
+        return compactAcksAfterNoGC;
+    }
+
+    /**
+     * Sets the number of GC cycles where no journal logs were removed before an attempt to
+     * move forward all the acks in the last log that contains them and is otherwise unreferenced.
+     * <p>
+     * A value of -1 will disable this feature.
+     *
+     * @param compactAcksAfterNoGC
+     *      Number of empty GC cycles before we rewrite old ACKS.
+     */
+    public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
+        this.compactAcksAfterNoGC = compactAcksAfterNoGC;
+    }
+
+    /**
+     * Returns whether Ack compaction will ignore that the store is still growing
+     * and run more often.
+     *
+     * @return the compactAcksIgnoresStoreGrowth current value.
+     */
+    public boolean isCompactAcksIgnoresStoreGrowth() {
+        return compactAcksIgnoresStoreGrowth;
+    }
+
+    /**
+     * Configure if Ack compaction will occur regardless of continued growth of the
+     * journal logs meaning that the store has not run out of space yet.  Because the
+     * compaction operation can be costly this value is defaulted to off and the Ack
+     * compaction is only done when it seems that the store cannot grow and larger.
+     *
+     * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
+     */
+    public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
+        this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/946e62d7/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
index 43fc152..641f176 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
@@ -30,6 +30,7 @@ import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand;
 import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand;
+import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand;
 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
 import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
@@ -84,4 +85,7 @@ public class Visitor {
 
     public void visit(KahaUpdateMessageCommand kahaUpdateMessageCommand) throws IOException {
     }
+
+    public void visit(KahaRewrittenDataFileCommand kahaUpdateMessageCommand) throws IOException {
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/946e62d7/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
index f1e078d..126d82b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
@@ -18,29 +18,23 @@ package org.apache.activemq.store.kahadb.disk.journal;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 
 import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
 import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.RecoverableRandomAccessFile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * DataFile
- *
- *
  */
 public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFile> {
 
-    private static final Logger LOG = LoggerFactory.getLogger(DataFile.class);
+    public final static byte STANDARD_LOG_FILE = 0x0;
 
     protected final File file;
     protected final Integer dataFileId;
     protected volatile int length;
+    protected int typeCode = STANDARD_LOG_FILE;
     protected final SequenceSet corruptedBlocks = new SequenceSet();
 
     DataFile(File file, int number) {
@@ -57,6 +51,14 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
         return dataFileId;
     }
 
+    public int getTypeCode() {
+        return typeCode;
+    }
+
+    public void setTypeCode(int typeCode) {
+        this.typeCode = typeCode;
+    }
+
     public synchronized int getLength() {
         return length;
     }
@@ -70,7 +72,7 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
     }
 
     @Override
-	public synchronized String toString() {
+    public synchronized String toString() {
         return file.getName() + " number = " + dataFileId + " , length = " + length;
     }
 
@@ -95,7 +97,7 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
     }
 
     @Override
-	public int compareTo(DataFile df) {
+    public int compareTo(DataFile df) {
         return dataFileId - df.dataFileId;
     }
 
@@ -112,5 +114,4 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
     public int hashCode() {
         return dataFileId;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/946e62d7/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
index 6a49d06..de68cf0 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -19,7 +19,6 @@ package org.apache.activemq.store.kahadb.disk.journal;
 import java.io.IOException;
 import java.util.Map;
 
-import org.apache.activemq.store.kahadb.AbstractKahaDBStore;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.RecoverableRandomAccessFile;
 import org.slf4j.Logger;
@@ -28,8 +27,6 @@ import org.slf4j.LoggerFactory;
 /**
  * Optimized Store reader and updater. Single threaded and synchronous. Use in
  * conjunction with the DataFileAccessorPool of concurrent use.
- *
- *
  */
 final class DataFileAccessor {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/946e62d7/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index 0ce647a..e2f173a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -54,33 +54,6 @@ class DataFileAppender implements FileAppender {
     protected boolean running;
     private Thread thread;
 
-    public static class WriteKey {
-        private final int file;
-        private final long offset;
-        private final int hash;
-
-        public WriteKey(Location item) {
-            file = item.getDataFileId();
-            offset = item.getOffset();
-            // TODO: see if we can build a better hash
-            hash = (int)(file ^ offset);
-        }
-
-        @Override
-        public int hashCode() {
-            return hash;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (obj instanceof WriteKey) {
-                WriteKey di = (WriteKey)obj;
-                return di.file == file && di.offset == offset;
-            }
-            return false;
-        }
-    }
-
     public class WriteBatch {
 
         public final DataFile dataFile;
@@ -206,7 +179,7 @@ class DataFileAppender implements FileAppender {
 
             while ( true ) {
                 if (nextWriteBatch == null) {
-                    DataFile file = journal.getCurrentWriteFile();
+                    DataFile file = journal.getOrCreateCurrentWriteFile();
                     if( file.getLength() + write.location.getSize() >= journal.getMaxFileLength() ) {
                         file = journal.rotateWriteFile();
                     }
@@ -287,9 +260,8 @@ class DataFileAppender implements FileAppender {
         DataFile dataFile = null;
         RecoverableRandomAccessFile file = null;
         WriteBatch wb = null;
-        try {
+        try (DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);) {
 
-            DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
             while (true) {
 
                 // Block till we get a command.

http://git-wip-us.apache.org/repos/asf/activemq/blob/946e62d7/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/FileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/FileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/FileAppender.java
index 6ed839f..e60096e 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/FileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/FileAppender.java
@@ -16,11 +16,12 @@
  */
 package org.apache.activemq.store.kahadb.disk.journal;
 
+import java.io.IOException;
+
 import org.apache.activemq.util.ByteSequence;
 
-import java.io.IOException;
+public interface FileAppender extends AutoCloseable {
 
-public interface FileAppender {
     public static final String PROPERTY_LOG_WRITE_STAT_WINDOW = "org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW";
     public static final int maxStat = Integer.parseInt(System.getProperty(PROPERTY_LOG_WRITE_STAT_WINDOW, "0"));
 
@@ -28,5 +29,6 @@ public interface FileAppender {
 
     Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException;
 
+    @Override
     void close() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/946e62d7/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index a89f2a1..e186e19 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -159,6 +159,7 @@ public class Journal {
     protected boolean checkForCorruptionOnStartup;
     protected boolean enableAsyncDiskSync = true;
     private Timer timer;
+    private int nextDataFileId = 1;
 
     protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
     protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
@@ -220,7 +221,9 @@ public class Journal {
             }
         }
 
-        getCurrentWriteFile();
+        nextDataFileId = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
+
+        getOrCreateCurrentWriteFile();
 
         if (preallocationStrategy != PreallocationStrategy.SPARSE_FILE && maxFileLength != DEFAULT_MAX_FILE_LENGTH) {
             LOG.warn("You are using a preallocation strategy and journal maxFileLength which should be benchmarked accordingly to not introduce unexpected latencies.");
@@ -345,6 +348,7 @@ public class Journal {
             LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e);
         }
     }
+
     private static byte[] bytes(String string) {
         try {
             return string.getBytes("UTF-8");
@@ -360,16 +364,17 @@ public class Journal {
 
         DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
         try {
-            while( true ) {
+            while (true) {
                 int size = checkBatchRecord(reader, location.getOffset());
-                if ( size>=0 && location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size <= dataFile.getLength()) {
-                    location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
+                if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) {
+                    location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size);
                 } else {
 
-                    // Perhaps it's just some corruption... scan through the file to find the next valid batch record.  We
+                    // Perhaps it's just some corruption... scan through the
+                    // file to find the next valid batch record. We
                     // may have subsequent valid batch records.
-                    int nextOffset = findNextBatchRecord(reader, location.getOffset()+1);
-                    if( nextOffset >=0 ) {
+                    int nextOffset = findNextBatchRecord(reader, location.getOffset() + 1);
+                    if (nextOffset >= 0) {
                         Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
                         LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence);
                         dataFile.corruptedBlocks.add(sequence);
@@ -391,9 +396,9 @@ public class Journal {
             totalLength.addAndGet(dataFile.getLength() - existingLen);
         }
 
-        if( !dataFile.corruptedBlocks.isEmpty() ) {
+        if (!dataFile.corruptedBlocks.isEmpty()) {
             // Is the end of the data file corrupted?
-            if( dataFile.corruptedBlocks.getTail().getLast()+1 == location.getOffset() ) {
+            if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) {
                 dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
             }
         }
@@ -407,19 +412,19 @@ public class Journal {
         ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
 
         int pos = 0;
-        while( true ) {
+        while (true) {
             pos = bs.indexOf(header, pos);
-            if( pos >= 0 ) {
-                return offset+pos;
+            if (pos >= 0) {
+                return offset + pos;
             } else {
                 // need to load the next data chunck in..
-                if( bs.length != data.length ) {
+                if (bs.length != data.length) {
                     // If we had a short read then we were at EOF
                     return -1;
                 }
-                offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length;
+                offset += bs.length - BATCH_CONTROL_RECORD_HEADER.length;
                 bs = new ByteSequence(data, 0, reader.read(offset, data));
-                pos=0;
+                pos = 0;
             }
         }
     }
@@ -431,34 +436,34 @@ public class Journal {
 
             reader.readFully(offset, controlRecord);
 
-            // Assert that it's  a batch record.
-            for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) {
-                if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) {
+            // Assert that it's a batch record.
+            for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) {
+                if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) {
                     return -1;
                 }
             }
 
             int size = controlIs.readInt();
-            if( size > MAX_BATCH_SIZE ) {
+            if (size > MAX_BATCH_SIZE) {
                 return -1;
             }
 
-            if( isChecksum() ) {
+            if (isChecksum()) {
 
                 long expectedChecksum = controlIs.readLong();
-                if( expectedChecksum == 0 ) {
+                if (expectedChecksum == 0) {
                     // Checksuming was not enabled when the record was stored.
                     // we can't validate the record :(
                     return size;
                 }
 
                 byte data[] = new byte[size];
-                reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data);
+                reader.readFully(offset + BATCH_CONTROL_RECORD_SIZE, data);
 
                 Checksum checksum = new Adler32();
                 checksum.update(data, 0, data.length);
 
-                if( expectedChecksum!=checksum.getValue() ) {
+                if (expectedChecksum != checksum.getValue()) {
                     return -1;
                 }
             }
@@ -474,15 +479,22 @@ public class Journal {
         return totalLength.get();
     }
 
-    synchronized DataFile getCurrentWriteFile() throws IOException {
+    synchronized DataFile getOrCreateCurrentWriteFile() throws IOException {
         if (dataFiles.isEmpty()) {
             rotateWriteFile();
         }
-        return dataFiles.getTail();
+
+        DataFile current = dataFiles.getTail();
+
+        if (current != null) {
+            return current;
+        } else {
+            return rotateWriteFile();
+        }
     }
 
     synchronized DataFile rotateWriteFile() {
-        int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
+        int nextNum = nextDataFileId++;
         File file = getFile(nextNum);
         DataFile nextWriteFile = new DataFile(file, nextNum);
         fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
@@ -491,6 +503,20 @@ public class Journal {
         return nextWriteFile;
     }
 
+    public synchronized DataFile reserveDataFile() {
+        int nextNum = nextDataFileId++;
+        File file = getFile(nextNum);
+        DataFile reservedDataFile = new DataFile(file, nextNum);
+        fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile);
+        fileByFileMap.put(file, reservedDataFile);
+        if (dataFiles.isEmpty()) {
+            dataFiles.addLast(reservedDataFile);
+        } else {
+            dataFiles.getTail().linkBefore(reservedDataFile);
+        }
+        return reservedDataFile;
+    }
+
     public File getFile(int nextNum) {
         String fileName = filePrefix + nextNum + fileSuffix;
         File file = new File(directory, fileName);
@@ -517,10 +543,6 @@ public class Journal {
         return dataFile.getFile();
     }
 
-    private DataFile getNextDataFile(DataFile dataFile) {
-        return dataFile.getNext();
-    }
-
     public void close() throws IOException {
         synchronized (this) {
             if (!started) {
@@ -559,6 +581,7 @@ public class Journal {
             DataFile dataFile = i.next();
             result &= dataFile.delete();
         }
+
         totalLength.set(0);
         fileMap.clear();
         fileByFileMap.clear();
@@ -574,11 +597,11 @@ public class Journal {
     public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
         for (Integer key : files) {
             // Can't remove the data file (or subsequent files) that is currently being written to.
-            if( key >= lastAppendLocation.get().getDataFileId() ) {
+            if (key >= lastAppendLocation.get().getDataFileId()) {
                 continue;
             }
             DataFile dataFile = fileMap.get(key);
-            if( dataFile!=null ) {
+            if (dataFile != null) {
                 forceRemoveDataFile(dataFile);
             }
         }
@@ -607,7 +630,7 @@ public class Journal {
             LOG.debug("Successfully moved data file");
         } else {
             LOG.debug("Deleting data file: {}", dataFile);
-            if ( dataFile.delete() ) {
+            if (dataFile.delete()) {
                 LOG.debug("Discarded data file: {}", dataFile);
             } else {
                 LOG.warn("Failed to discard data file : {}", dataFile.getFile());
@@ -644,7 +667,7 @@ public class Journal {
             if (cur == null) {
                 if (location == null) {
                     DataFile head = dataFiles.getHead();
-                    if( head == null ) {
+                    if (head == null) {
                         return null;
                     }
                     cur = new Location();
@@ -667,7 +690,7 @@ public class Journal {
 
             // Did it go into the next file??
             if (dataFile.getLength() <= cur.getOffset()) {
-                dataFile = getNextDataFile(dataFile);
+                dataFile = dataFile.getNext();
                 if (dataFile == null) {
                     return null;
                 } else {
@@ -796,10 +819,35 @@ public class Journal {
         this.archiveDataLogs = archiveDataLogs;
     }
 
-    synchronized public Integer getCurrentDataFileId() {
-        if (dataFiles.isEmpty())
+    public synchronized DataFile getDataFileById(int dataFileId) {
+        if (dataFiles.isEmpty()) {
+            return null;
+        }
+
+        return fileMap.get(Integer.valueOf(dataFileId));
+    }
+
+    public synchronized DataFile getCurrentDataFile() {
+        if (dataFiles.isEmpty()) {
             return null;
-        return dataFiles.getTail().getDataFileId();
+        }
+
+        DataFile current = dataFiles.getTail();
+
+        if (current != null) {
+            return current;
+        } else {
+            return null;
+        }
+    }
+
+    public synchronized Integer getCurrentDataFileId() {
+        DataFile current = getCurrentDataFile();
+        if (current != null) {
+            return current.getDataFileId();
+        } else {
+            return null;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/946e62d7/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java
new file mode 100644
index 0000000..3e3e090
--- /dev/null
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.journal;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+
+import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
+import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.RecoverableRandomAccessFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * File Appender instance that performs batched writes in the thread where the write is
+ * queued.  This appender does not honor the maxFileLength value in the journal as the
+ * files created here are out-of-band logs used for other purposes such as journal level
+ * compaction.
+ */
+public class TargetedDataFileAppender implements FileAppender {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TargetedDataFileAppender.class);
+
+    private final Journal journal;
+    private final DataFile target;
+    private final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
+    private final int maxWriteBatchSize;
+
+    private boolean closed;
+    private boolean preallocate;
+    private WriteBatch nextWriteBatch;
+    private int statIdx = 0;
+    private int[] stats = new int[maxStat];
+
+    public class WriteBatch {
+
+        protected final int offset;
+
+        public final DataFile dataFile;
+        public final LinkedNodeList<Journal.WriteCommand> writes = new LinkedNodeList<Journal.WriteCommand>();
+        public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
+        public AtomicReference<IOException> exception = new AtomicReference<IOException>();
+
+        public WriteBatch(DataFile dataFile, int offset) {
+            this.dataFile = dataFile;
+            this.offset = offset;
+            this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
+            this.size = Journal.BATCH_CONTROL_RECORD_SIZE;
+            journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
+        }
+
+        public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws IOException {
+            this(dataFile, offset);
+            append(write);
+        }
+
+        public boolean canAppend(Journal.WriteCommand write) {
+            int newSize = size + write.location.getSize();
+            if (newSize >= maxWriteBatchSize) {
+                return false;
+            }
+            return true;
+        }
+
+        public void append(Journal.WriteCommand write) throws IOException {
+            this.writes.addLast(write);
+            write.location.setDataFileId(dataFile.getDataFileId());
+            write.location.setOffset(offset + size);
+            int s = write.location.getSize();
+            size += s;
+            dataFile.incrementLength(s);
+            journal.addToTotalLength(s);
+        }
+    }
+
+    /**
+     * Construct a Store writer
+     */
+    public TargetedDataFileAppender(Journal journal, DataFile target) {
+        this.journal = journal;
+        this.target = target;
+        this.inflightWrites = this.journal.getInflightWrites();
+        this.maxWriteBatchSize = this.journal.getWriteBatchSize();
+    }
+
+    @Override
+    public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
+        checkClosed();
+
+        // Write the packet our internal buffer.
+        int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
+
+        final Location location = new Location();
+        location.setSize(size);
+        location.setType(type);
+
+        Journal.WriteCommand write = new Journal.WriteCommand(location, data, sync);
+
+        enqueueWrite(write);
+
+        if (sync) {
+            writePendingBatch();
+        }
+
+        return location;
+    }
+
+    @Override
+    public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
+        checkClosed();
+
+        // Write the packet our internal buffer.
+        int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
+
+        final Location location = new Location();
+        location.setSize(size);
+        location.setType(type);
+
+        Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete);
+
+        enqueueWrite(write);
+
+        return location;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (!closed) {
+            if (nextWriteBatch != null) {
+                // force sync of current in-progress batched write.
+                LOG.debug("Close of targeted appender flushing last batch.");
+                writePendingBatch();
+            }
+
+            closed = true;
+        }
+    }
+
+    //----- Appender Configuration -------------------------------------------//
+
+    public boolean isPreallocate() {
+        return preallocate;
+    }
+
+    public void setPreallocate(boolean preallocate) {
+        this.preallocate = preallocate;
+    }
+
+    //----- Internal Implementation ------------------------------------------//
+
+    private void checkClosed() throws IOException {
+        if (closed) {
+            throw new IOException("The appender is clsoed");
+        }
+    }
+
+    private WriteBatch enqueueWrite(Journal.WriteCommand write) throws IOException {
+        while (true) {
+            if (nextWriteBatch == null) {
+                nextWriteBatch = new WriteBatch(target, target.getLength(), write);
+                break;
+            } else {
+                // Append to current batch if possible..
+                if (nextWriteBatch.canAppend(write)) {
+                    nextWriteBatch.append(write);
+                    break;
+                } else {
+                    // Flush current batch and start a new one.
+                    writePendingBatch();
+                    nextWriteBatch = null;
+                }
+            }
+        }
+
+        if (!write.sync) {
+            inflightWrites.put(new Journal.WriteKey(write.location), write);
+        }
+
+        return nextWriteBatch;
+    }
+
+    private void writePendingBatch() throws IOException {
+        DataFile dataFile = nextWriteBatch.dataFile;
+
+        try (RecoverableRandomAccessFile file = dataFile.openRandomAccessFile();
+             DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);) {
+
+            // preallocate on first open of new file (length == 0) if configured to do so.
+            // NOTE: dataFile.length cannot be used because it is updated in enqueue
+            if (file.length() == 0L && isPreallocate()) {
+                journal.preallocateEntireJournalDataFile(file);
+            }
+
+            Journal.WriteCommand write = nextWriteBatch.writes.getHead();
+
+            // Write an empty batch control record.
+            buff.reset();
+            buff.writeInt(Journal.BATCH_CONTROL_RECORD_SIZE);
+            buff.writeByte(Journal.BATCH_CONTROL_RECORD_TYPE);
+            buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC);
+            buff.writeInt(0);
+            buff.writeLong(0);
+
+            while (write != null) {
+                buff.writeInt(write.location.getSize());
+                buff.writeByte(write.location.getType());
+                buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
+                write = write.getNext();
+            }
+
+            // append 'unset' next batch (5 bytes) so read can always find eof
+            buff.writeInt(0);
+            buff.writeByte(0);
+
+            ByteSequence sequence = buff.toByteSequence();
+
+            // Now we can fill in the batch control record properly.
+            buff.reset();
+            buff.skip(5 + Journal.BATCH_CONTROL_RECORD_MAGIC.length);
+            buff.writeInt(sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5);
+            if (journal.isChecksum()) {
+                Checksum checksum = new Adler32();
+                checksum.update(sequence.getData(),
+                                sequence.getOffset() + Journal.BATCH_CONTROL_RECORD_SIZE,
+                                sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5);
+                buff.writeLong(checksum.getValue());
+            }
+
+            // Now do the 1 big write.
+            file.seek(nextWriteBatch.offset);
+            if (maxStat > 0) {
+                if (statIdx < maxStat) {
+                    stats[statIdx++] = sequence.getLength();
+                } else {
+                    long all = 0;
+                    for (; statIdx > 0;) {
+                        all += stats[--statIdx];
+                    }
+                    LOG.trace("Ave writeSize: {}", all / maxStat);
+                }
+            }
+
+            file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+
+            ReplicationTarget replicationTarget = journal.getReplicationTarget();
+            if (replicationTarget != null) {
+                replicationTarget.replicate(nextWriteBatch.writes.getHead().location, sequence, true);
+            }
+
+            file.sync();
+
+            signalDone(nextWriteBatch);
+        } catch (IOException e) {
+            LOG.info("Journal failed while writing at: {}", nextWriteBatch.offset);
+            throw e;
+        }
+    }
+
+    private void signalDone(WriteBatch writeBatch) {
+        // Now that the data is on disk, remove the writes from the in
+        // flight cache and signal any onComplete requests.
+        Journal.WriteCommand write = writeBatch.writes.getHead();
+        while (write != null) {
+            if (!write.sync) {
+                inflightWrites.remove(new Journal.WriteKey(write.location));
+            }
+
+            if (write.onComplete != null) {
+                try {
+                    write.onComplete.run();
+                } catch (Throwable e) {
+                    LOG.info("Add exception was raised while executing the run command for onComplete", e);
+                }
+            }
+
+            write = write.getNext();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/946e62d7/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStream.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStream.java
index 7147fd9..455a020 100755
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStream.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStream.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,19 +16,18 @@
  */
 package org.apache.activemq.store.kahadb.disk.util;
 
-import org.apache.activemq.util.ByteSequence;
-
 import java.io.DataInput;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UTFDataFormatException;
 
+import org.apache.activemq.util.ByteSequence;
+
 /**
  * Optimized ByteArrayInputStream that can be used more than once
- *
- *
  */
-public final class DataByteArrayInputStream extends InputStream implements DataInput {
+public final class DataByteArrayInputStream extends InputStream implements DataInput, AutoCloseable {
+
     private byte[] buf;
     private int pos;
     private int offset;
@@ -137,6 +136,7 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
      * @return the next byte of data, or <code>-1</code> if the end of the
      *         stream has been reached.
      */
+    @Override
     public int read() {
         return (pos < length) ? (buf[pos++] & 0xff) : -1;
     }
@@ -152,6 +152,7 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
      *         <code>-1</code> if there is no more data because the end of the
      *         stream has been reached.
      */
+    @Override
     public int read(byte b[], int off, int len) {
         if (b == null) {
             throw new NullPointerException();
@@ -174,18 +175,22 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
      * @return the number of bytes that can be read from the input stream
      *         without blocking.
      */
+    @Override
     public int available() {
         return length - pos;
     }
 
+    @Override
     public void readFully(byte[] b) {
         read(b, 0, b.length);
     }
 
+    @Override
     public void readFully(byte[] b, int off, int len) {
         read(b, off, len);
     }
 
+    @Override
     public int skipBytes(int n) {
         if (pos + n > length) {
             n = length - pos;
@@ -197,39 +202,47 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
         return n;
     }
 
+    @Override
     public boolean readBoolean() {
         return read() != 0;
     }
 
+    @Override
     public byte readByte() {
         return (byte)read();
     }
 
+    @Override
     public int readUnsignedByte() {
         return read();
     }
 
+    @Override
     public short readShort() {
         this.read(work, 0, 2);
         return (short) (((work[0] & 0xff) << 8) | (work[1] & 0xff));
     }
 
+    @Override
     public int readUnsignedShort() {
         this.read(work, 0, 2);
-        return (int) (((work[0] & 0xff) << 8) | (work[1] & 0xff));
+        return ((work[0] & 0xff) << 8) | (work[1] & 0xff);
     }
 
+    @Override
     public char readChar() {
         this.read(work, 0, 2);
         return (char) (((work[0] & 0xff) << 8) | (work[1] & 0xff));
     }
 
+    @Override
     public int readInt() {
         this.read(work, 0, 4);
         return ((work[0] & 0xff) << 24) | ((work[1] & 0xff) << 16) |
                ((work[2] & 0xff) << 8) | (work[3] & 0xff);
     }
 
+    @Override
     public long readLong() {
         this.read(work, 0, 8);
 
@@ -241,14 +254,17 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
         return ((i1 & 0xffffffffL) << 32) | (i2 & 0xffffffffL);
     }
 
+    @Override
     public float readFloat() throws IOException {
         return Float.intBitsToFloat(readInt());
     }
 
+    @Override
     public double readDouble() throws IOException {
         return Double.longBitsToDouble(readLong());
     }
 
+    @Override
     public String readLine() {
         int start = pos;
         while (pos < length) {
@@ -267,6 +283,7 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
         return new String(buf, start, pos);
     }
 
+    @Override
     public String readUTF() throws IOException {
         int length = readUnsignedShort();
         int endPos = pos + length;

http://git-wip-us.apache.org/repos/asf/activemq/blob/946e62d7/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayOutputStream.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayOutputStream.java
index 469c853..d29b70f 100755
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayOutputStream.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayOutputStream.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,21 +16,18 @@
  */
 package org.apache.activemq.store.kahadb.disk.util;
 
-import org.apache.activemq.store.kahadb.disk.page.PageFile;
-import org.apache.activemq.util.ByteSequence;
-
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.UTFDataFormatException;
 
+import org.apache.activemq.store.kahadb.disk.page.PageFile;
+import org.apache.activemq.util.ByteSequence;
 
 /**
  * Optimized ByteArrayOutputStream
- *
- *
  */
-public class DataByteArrayOutputStream extends OutputStream implements DataOutput {
+public class DataByteArrayOutputStream extends OutputStream implements DataOutput, AutoCloseable {
     private static final int DEFAULT_SIZE = PageFile.DEFAULT_PAGE_SIZE;
     protected byte buf[];
     protected int pos;
@@ -88,6 +85,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
      * @param b the byte to be written.
      * @throws IOException
      */
+    @Override
     public void write(int b) throws IOException {
         int newcount = pos + 1;
         ensureEnoughBuffer(newcount);
@@ -105,6 +103,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
      * @param len the number of bytes to write.
      * @throws IOException
      */
+    @Override
     public void write(byte b[], int off, int len) throws IOException {
         if (len == 0) {
             return;
@@ -146,18 +145,21 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
         return pos;
     }
 
+    @Override
     public void writeBoolean(boolean v) throws IOException {
         ensureEnoughBuffer(pos + 1);
         buf[pos++] = (byte)(v ? 1 : 0);
         onWrite();
     }
 
+    @Override
     public void writeByte(int v) throws IOException {
         ensureEnoughBuffer(pos + 1);
         buf[pos++] = (byte)(v >>> 0);
         onWrite();
     }
 
+    @Override
     public void writeShort(int v) throws IOException {
         ensureEnoughBuffer(pos + 2);
         buf[pos++] = (byte)(v >>> 8);
@@ -165,6 +167,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
         onWrite();
     }
 
+    @Override
     public void writeChar(int v) throws IOException {
         ensureEnoughBuffer(pos + 2);
         buf[pos++] = (byte)(v >>> 8);
@@ -172,6 +175,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
         onWrite();
     }
 
+    @Override
     public void writeInt(int v) throws IOException {
         ensureEnoughBuffer(pos + 4);
         buf[pos++] = (byte)(v >>> 24);
@@ -181,6 +185,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
         onWrite();
     }
 
+    @Override
     public void writeLong(long v) throws IOException {
         ensureEnoughBuffer(pos + 8);
         buf[pos++] = (byte)(v >>> 56);
@@ -194,14 +199,17 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
         onWrite();
     }
 
+    @Override
     public void writeFloat(float v) throws IOException {
         writeInt(Float.floatToIntBits(v));
     }
 
+    @Override
     public void writeDouble(double v) throws IOException {
         writeLong(Double.doubleToLongBits(v));
     }
 
+    @Override
     public void writeBytes(String s) throws IOException {
         int length = s.length();
         for (int i = 0; i < length; i++) {
@@ -209,6 +217,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
         }
     }
 
+    @Override
     public void writeChars(String s) throws IOException {
         int length = s.length();
         for (int i = 0; i < length; i++) {
@@ -218,6 +227,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
         }
     }
 
+    @Override
     public void writeUTF(String str) throws IOException {
         int strlen = str.length();
         int encodedsize = 0;

http://git-wip-us.apache.org/repos/asf/activemq/blob/946e62d7/activemq-kahadb-store/src/main/proto/journal-data.proto
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/proto/journal-data.proto b/activemq-kahadb-store/src/main/proto/journal-data.proto
index 01607a5..2dd97b9 100644
--- a/activemq-kahadb-store/src/main/proto/journal-data.proto
+++ b/activemq-kahadb-store/src/main/proto/journal-data.proto
@@ -37,6 +37,7 @@ enum KahaEntryType {
   KAHA_REMOVE_SCHEDULED_JOB_COMMAND = 13;
   KAHA_REMOVE_SCHEDULED_JOBS_COMMAND = 14;
   KAHA_DESTROY_SCHEDULER_COMMAND = 15;
+  KAHA_REWRITTEN_DATA_FILE_COMMAND = 16;
 }
 
 message KahaTraceCommand {
@@ -240,6 +241,17 @@ message KahaDestroySchedulerCommand {
   required string scheduler=1;
 }
 
+message KahaRewrittenDataFileCommand {
+  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRewrittenDataFileCommand>";
+  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  required int32 sourceDataFileId = 1;
+  optional int32 rewriteType = 2;
+  optional bool skipIfSourceExists = 3 [default = true];
+
+}
+
 // TODO things to ponder
 // should we move more message fields
 // that are set by the sender (and rarely required by the broker

http://git-wip-us.apache.org/repos/asf/activemq/blob/946e62d7/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java
new file mode 100644
index 0000000..a6cdb9e
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.journal;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the single threaded DataFileAppender class.
+ */
+public class TargetedDataFileAppenderTest {
+
+    private Journal dataManager;
+    private TargetedDataFileAppender appender;
+    private DataFile dataFile;
+    private File dir;
+
+    @Before
+    public void setUp() throws Exception {
+        dir = new File("target/tests/TargetedDataFileAppenderTest");
+        dir.mkdirs();
+        dataManager = new Journal();
+        dataManager.setDirectory(dir);
+        dataManager.start();
+
+        dataFile = dataManager.reserveDataFile();
+        appender = new TargetedDataFileAppender(dataManager, dataFile);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        dataManager.close();
+        IOHelper.delete(dir);
+    }
+
+    @Test
+    public void testWritesAreBatched() throws Exception {
+        final int iterations = 10;
+        ByteSequence data = new ByteSequence("DATA".getBytes());
+        for (int i = 0; i < iterations; i++) {
+            appender.storeItem(data, Journal.USER_RECORD_TYPE, false);
+        }
+
+        assertTrue("Data file should not be empty", dataFile.getLength() > 0);
+        assertTrue("Data file should be empty", dataFile.getFile().length() == 0);
+
+        appender.close();
+
+        // at this point most probably dataManager.getInflightWrites().size() >= 0
+        // as the Thread created in DataFileAppender.enqueue() may not have caught up.
+        assertTrue("Data file should not be empty", dataFile.getLength() > 0);
+        assertTrue("Data file should not be empty", dataFile.getFile().length() > 0);
+    }
+
+    @Test
+    public void testBatchWritesCompleteAfterClose() throws Exception {
+        final int iterations = 10;
+        ByteSequence data = new ByteSequence("DATA".getBytes());
+        for (int i = 0; i < iterations; i++) {
+            appender.storeItem(data, Journal.USER_RECORD_TYPE, false);
+        }
+
+        appender.close();
+
+        // at this point most probably dataManager.getInflightWrites().size() >= 0
+        // as the Thread created in DataFileAppender.enqueue() may not have caught up.
+        assertTrue("Data file should not be empty", dataFile.getLength() > 0);
+        assertTrue("Data file should not be empty", dataFile.getFile().length() > 0);
+    }
+
+    @Test
+    public void testBatchWriteCallbackCompleteAfterClose() throws Exception {
+        final int iterations = 10;
+        final CountDownLatch latch = new CountDownLatch(iterations);
+        ByteSequence data = new ByteSequence("DATA".getBytes());
+        for (int i = 0; i < iterations; i++) {
+            appender.storeItem(data, Journal.USER_RECORD_TYPE, new Runnable() {
+                @Override
+                public void run() {
+                    latch.countDown();
+                }
+            });
+        }
+
+        appender.close();
+
+        // at this point most probably dataManager.getInflightWrites().size() >= 0
+        // as the Thread created in DataFileAppender.enqueue() may not have caught up.
+        assertTrue("queued data is written", latch.await(5, TimeUnit.SECONDS));
+        assertTrue("Data file should not be empty", dataFile.getLength() > 0);
+        assertTrue("Data file should not be empty", dataFile.getFile().length() > 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/946e62d7/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java
index 99a3e9e..f1797cb 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java
@@ -87,9 +87,7 @@ public class TransactedStoreUsageSuspendResumeTest {
                 do {
                     Message message = consumer.receive(5000);
                     if (message != null) {
-                        if ((messagesReceivedCountDown.getCount() % (MAX_MESSAGES / 5)) == 0) {
-                            session.commit();
-                        }
+                        session.commit();
                         messagesReceivedCountDown.countDown();
                     }
                     if (messagesReceivedCountDown.getCount() % 500 == 0) {


Mime
View raw message