activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5603 - add preallocationScope=full_journal_async that will preallocate a journal in advance or use to avoid latency jitter on journal rotation. Added none option to disable preallocation
Date Fri, 29 Apr 2016 16:00:55 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 3c342ffce -> 62bdbb0db


https://issues.apache.org/jira/browse/AMQ-5603 - add preallocationScope=full_journal_async that will preallocate a journal in advance or use to avoid latency jitter on journal rotation. Added none option to disable preallocation


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/62bdbb0d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/62bdbb0d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/62bdbb0d

Branch: refs/heads/master
Commit: 62bdbb0db5dc4354f0e00fd5259b3db53eb1432d
Parents: 3c342ff
Author: gtully <gary.tully@gmail.com>
Authored: Fri Apr 29 16:57:03 2016 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Fri Apr 29 16:57:28 2016 +0100

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  45 ++-
 .../store/kahadb/disk/journal/DataFile.java     |  11 +
 .../kahadb/disk/journal/DataFileAppender.java   |  38 +-
 .../store/kahadb/disk/journal/Journal.java      | 391 +++++++++++++------
 .../disk/journal/TargetedDataFileAppender.java  |   8 +-
 .../JournalCorruptionEofIndexRecoveryTest.java  |   5 +-
 .../JournalCorruptionIndexRecoveryTest.java     |   2 +
 .../store/kahadb/disk/journal/JournalTest.java  |   1 +
 .../PreallocationJournalLatencyTest.java        |  15 +-
 .../journal/TargetedDataFileAppenderTest.java   |   1 +
 .../activemq/bugs/AMQ2584ConcurrentDlqTest.java |   2 +
 .../org/apache/activemq/bugs/AMQ3120Test.java   |   4 +-
 .../org/apache/activemq/bugs/AMQ4323Test.java   |   4 +-
 .../store/kahadb/KahaDBIndexLocationTest.java   |   2 +-
 14 files changed, 339 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 8bb9491..3e754f7 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -257,7 +257,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
     boolean enableIndexWriteAsync = false;
     int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
-    private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
+    private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name();
     private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
 
     protected AtomicBoolean opened = new AtomicBoolean();
@@ -1860,32 +1860,37 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
 
         @Override
         public void run() {
+
+            int journalToAdvance = -1;
+            Set<Integer> journalLogsReferenced = new HashSet<Integer>();
+
             // Lock index to capture the ackMessageFileMap data
             indexLock.writeLock().lock();
 
-            // Map keys might not be sorted, find the earliest log file to forward acks
-            // from and move only those, future cycles can chip away at more as needed.
-            // We won't move files that are themselves rewritten on a previous compaction.
-            List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
-            Collections.sort(journalFileIds);
-            int journalToAdvance = -1;
-            for (Integer journalFileId : journalFileIds) {
-                DataFile current = journal.getDataFileById(journalFileId);
-                if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
-                    journalToAdvance = journalFileId;
-                    break;
+            try {
+                // Map keys might not be sorted, find the earliest log file to forward acks
+                // from and move only those, future cycles can chip away at more as needed.
+                // We won't move files that are themselves rewritten on a previous compaction.
+                List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
+                Collections.sort(journalFileIds);
+                for (Integer journalFileId : journalFileIds) {
+                    DataFile current = journal.getDataFileById(journalFileId);
+                    if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
+                        journalToAdvance = journalFileId;
+                        break;
+                    }
                 }
-            }
 
-            // Check if we found one, or if we only found the current file being written to.
-            if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
-                return;
-            }
+                // Check if we found one, or if we only found the current file being written to.
+                if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
+                    return;
+                }
 
-            Set<Integer> journalLogsReferenced =
-                new HashSet<Integer>(metadata.ackMessageFileMap.get(journalToAdvance));
+                journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
 
-            indexLock.writeLock().unlock();
+            } finally {
+                indexLock.writeLock().unlock();
+            }
 
             try {
                 // Background rewrite of the old acks

http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
index 126d82b..5b96adf 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
@@ -36,6 +36,7 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
     protected volatile int length;
     protected int typeCode = STANDARD_LOG_FILE;
     protected final SequenceSet corruptedBlocks = new SequenceSet();
+    protected RecoverableRandomAccessFile appendRandomAccessFile;
 
     DataFile(File file, int number) {
         this.file = file;
@@ -76,12 +77,22 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
         return file.getName() + " number = " + dataFileId + " , length = " + length;
     }
 
+    public synchronized RecoverableRandomAccessFile appendRandomAccessFile() throws IOException {
+        if (appendRandomAccessFile == null) {
+            appendRandomAccessFile = new RecoverableRandomAccessFile(file.getCanonicalPath(), "rw");
+        }
+        return appendRandomAccessFile;
+    }
+
     public synchronized RecoverableRandomAccessFile openRandomAccessFile() throws IOException {
         return new RecoverableRandomAccessFile(file.getCanonicalPath(), "rw");
     }
 
     public synchronized void closeRandomAccessFile(RecoverableRandomAccessFile file) throws IOException {
         file.close();
+        if (file == appendRandomAccessFile) {
+            appendRandomAccessFile = null;
+        }
     }
 
     public synchronized boolean delete() throws IOException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index e2f173a..792431c 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -31,6 +31,9 @@ import org.apache.activemq.util.RecoverableRandomAccessFile;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.activemq.store.kahadb.disk.journal.Journal.EMPTY_BATCH_CONTROL_RECORD;
+import static org.apache.activemq.store.kahadb.disk.journal.Journal.RECORD_HEAD_SPACE;
+
 /**
  * An optimized writer to do batch appends to a data file. This object is thread
  * safe and gains throughput as you increase the number of concurrent writes it
@@ -110,7 +113,7 @@ class DataFileAppender implements FileAppender {
     public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
 
         // Write the packet our internal buffer.
-        int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
+        int size = data.getLength() + RECORD_HEAD_SPACE;
 
         final Location location = new Location();
         location.setSize(size);
@@ -138,7 +141,7 @@ class DataFileAppender implements FileAppender {
     @Override
     public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
         // Write the packet our internal buffer.
-        int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
+        int size = data.getLength() + RECORD_HEAD_SPACE;
 
         final Location location = new Location();
         location.setSize(size);
@@ -179,12 +182,7 @@ class DataFileAppender implements FileAppender {
 
             while ( true ) {
                 if (nextWriteBatch == null) {
-                    DataFile file = journal.getOrCreateCurrentWriteFile();
-                    if( file.getLength() + write.location.getSize() >= journal.getMaxFileLength() ) {
-                        file = journal.rotateWriteFile();
-                    }
-
-
+                    DataFile file = journal.getCurrentDataFile(write.location.getSize());
                     nextWriteBatch = newWriteBatch(write, file);
                     enqueueMutex.notifyAll();
                     break;
@@ -285,23 +283,14 @@ class DataFileAppender implements FileAppender {
                         dataFile.closeRandomAccessFile(file);
                     }
                     dataFile = wb.dataFile;
-                    file = dataFile.openRandomAccessFile();
-                    // pre allocate on first open of new file (length==0)
-                    // note dataFile.length cannot be used because it is updated in enqueue
-                    if (file.length() == 0l) {
-                        journal.preallocateEntireJournalDataFile(file);
-                    }
+                    file = dataFile.appendRandomAccessFile();
                 }
 
                 Journal.WriteCommand write = wb.writes.getHead();
 
                 // Write an empty batch control record.
                 buff.reset();
-                buff.writeInt(Journal.BATCH_CONTROL_RECORD_SIZE);
-                buff.writeByte(Journal.BATCH_CONTROL_RECORD_TYPE);
-                buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC);
-                buff.writeInt(0);
-                buff.writeLong(0);
+                buff.write(EMPTY_BATCH_CONTROL_RECORD);
 
                 boolean forceToDisk = false;
                 while (write != null) {
@@ -312,19 +301,18 @@ class DataFileAppender implements FileAppender {
                     write = write.getNext();
                 }
 
-                // append 'unset' next batch (5 bytes) so read can always find eof
-                buff.writeInt(0);
-                buff.writeByte(0);
+                // append 'unset', zero length next batch so read can always find eof
+                buff.write(Journal.EOF_RECORD);
 
                 ByteSequence sequence = buff.toByteSequence();
 
                 // Now we can fill in the batch control record properly.
                 buff.reset();
-                buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length);
-                buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE - 5);
+                buff.skip(RECORD_HEAD_SPACE + Journal.BATCH_CONTROL_RECORD_MAGIC.length);
+                buff.writeInt(sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - Journal.EOF_RECORD.length);
                 if( journal.isChecksum() ) {
                     Checksum checksum = new Adler32();
-                    checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE - 5);
+                    checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE-Journal.EOF_RECORD.length);
                     buff.writeLong(checksum.getValue());
                 }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index da0d5b4..182a3d7 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -23,19 +23,16 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.TreeMap;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.zip.Adler32;
@@ -43,13 +40,13 @@ import java.util.zip.Checksum;
 
 import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
 import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
-import org.apache.activemq.store.kahadb.disk.util.SchedulerTimerTask;
 import org.apache.activemq.store.kahadb.disk.util.Sequence;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.DataByteArrayInputStream;
 import org.apache.activemq.util.DataByteArrayOutputStream;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.RecoverableRandomAccessFile;
+import org.apache.activemq.util.ThreadPoolUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,6 +70,12 @@ public class Journal {
     public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
     public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE + BATCH_CONTROL_RECORD_MAGIC.length + 4 + 8;
     public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
+    public static final byte[] EMPTY_BATCH_CONTROL_RECORD = createEmptyBatchControlRecordHeader();
+    public static final int EOF_INT = ByteBuffer.wrap(new byte[]{'-', 'q', 'M', 'a'}).getInt();
+    public static final byte EOF_EOT = '4';
+    public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord();
+
+    private ScheduledExecutorService scheduler;
 
     // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss
     public void corruptRecoveryLocation(Location recoveryPosition) throws IOException {
@@ -103,7 +106,9 @@ public class Journal {
     }
 
     public enum PreallocationScope {
-        ENTIRE_JOURNAL;
+        ENTIRE_JOURNAL,
+        ENTIRE_JOURNAL_ASYNC,
+        NONE;
     }
 
     private static byte[] createBatchControlRecordHeader() {
@@ -119,13 +124,39 @@ public class Journal {
         }
     }
 
+    private static byte[] createEmptyBatchControlRecordHeader() {
+        try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
+            os.writeInt(BATCH_CONTROL_RECORD_SIZE);
+            os.writeByte(BATCH_CONTROL_RECORD_TYPE);
+            os.write(BATCH_CONTROL_RECORD_MAGIC);
+            os.writeInt(0);
+            os.writeLong(0l);
+            ByteSequence sequence = os.toByteSequence();
+            sequence.compact();
+            return sequence.getData();
+        } catch (IOException e) {
+            throw new RuntimeException("Could not create empty batch control record header.", e);
+        }
+    }
+
+    private static byte[] createEofBatchAndLocationRecord() {
+        try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
+            os.writeInt(EOF_INT);
+            os.writeByte(EOF_EOT);
+            ByteSequence sequence = os.toByteSequence();
+            sequence.compact();
+            return sequence.getData();
+        } catch (IOException e) {
+            throw new RuntimeException("Could not create eof header.", e);
+        }
+    }
+
     public static final String DEFAULT_DIRECTORY = ".";
     public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
     public static final String DEFAULT_FILE_PREFIX = "db-";
     public static final String DEFAULT_FILE_SUFFIX = ".log";
     public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
     public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
-    public static final int PREFERED_DIFF = 1024 * 512;
     public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
 
     private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
@@ -151,18 +182,21 @@ public class Journal {
     protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
 
     protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
-    protected Runnable cleanupTask;
+    protected ScheduledFuture cleanupTask;
     protected AtomicLong totalLength = new AtomicLong();
     protected boolean archiveDataLogs;
     private ReplicationTarget replicationTarget;
     protected boolean checksum;
     protected boolean checkForCorruptionOnStartup;
     protected boolean enableAsyncDiskSync = true;
-    private Timer timer;
     private int nextDataFileId = 1;
+    private Object dataFileIdLock = new Object();
+    private  final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null);
+    private volatile DataFile nextDataFile;
 
-    protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
+    protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL_ASYNC;
     protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
+    private File osKernelCopyTemplateFile = null;
 
     public interface DataFileRemovedListener {
         void fileRemoved(DataFile datafile);
@@ -204,13 +238,15 @@ public class Journal {
 
             // Sort the list so that we can link the DataFiles together in the
             // right order.
-            List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
+            LinkedList<DataFile> l = new LinkedList<>(fileMap.values());
             Collections.sort(l);
             for (DataFile df : l) {
                 if (df.getLength() == 0) {
                     // possibly the result of a previous failed write
                     LOG.info("ignoring zero length, partially initialised journal data file: " + df);
                     continue;
+                } else if (l.getLast().equals(df) && isUnusedPreallocated(df)) {
+                    continue;
                 }
                 dataFiles.addLast(df);
                 fileByFileMap.put(df.getFile(), df);
@@ -221,9 +257,31 @@ public class Journal {
             }
         }
 
-        nextDataFileId = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
+        if (preallocationScope != PreallocationScope.NONE && preallocationStrategy == PreallocationStrategy.OS_KERNEL_COPY) {
+            // create a template file that will be used to pre-allocate the journal files
+            if (osKernelCopyTemplateFile == null) {
+                osKernelCopyTemplateFile = createJournalTemplateFile();
+            }
+        }
 
-        getOrCreateCurrentWriteFile();
+        scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread schedulerThread = new Thread(r);
+                schedulerThread.setName("ActiveMQ Journal Scheduled executor");
+                schedulerThread.setDaemon(true);
+                return schedulerThread;
+            }
+        });
+
+        // init current write file
+        if (dataFiles.isEmpty()) {
+            nextDataFileId = 1;
+            rotateWriteFile();
+        } else {
+            currentDataFile.set(dataFiles.getTail());
+            nextDataFileId = currentDataFile.get().dataFileId + 1;
+        }
 
         if (preallocationStrategy != PreallocationStrategy.SPARSE_FILE && maxFileLength != DEFAULT_MAX_FILE_LENGTH) {
             LOG.warn("You are using a preallocation strategy and journal maxFileLength which should be benchmarked accordingly to not introduce unexpected latencies.");
@@ -239,23 +297,20 @@ public class Journal {
             totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength);
         }
 
-        cleanupTask = new Runnable() {
+        cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
                 cleanup();
             }
-        };
+        }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS);
 
-        this.timer = new Timer("KahaDB Scheduler", true);
-        TimerTask task = new SchedulerTimerTask(cleanupTask);
-        this.timer.scheduleAtFixedRate(task, DEFAULT_CLEANUP_INTERVAL,DEFAULT_CLEANUP_INTERVAL);
         long end = System.currentTimeMillis();
         LOG.trace("Startup took: "+(end-start)+" ms");
     }
 
     public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) {
 
-        if (PreallocationScope.ENTIRE_JOURNAL == preallocationScope) {
+        if (PreallocationScope.NONE != preallocationScope) {
 
             if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
                 doPreallocationKernelCopy(file);
@@ -266,58 +321,68 @@ public class Journal {
             } else {
                 doPreallocationSparseFile(file);
             }
-        } else {
-            LOG.info("Using journal preallocation scope of batch allocation");
         }
     }
 
     private void doPreallocationSparseFile(RecoverableRandomAccessFile file) {
+        final ByteBuffer journalEof = ByteBuffer.wrap(EOF_RECORD);
         try {
-            file.seek(maxFileLength - 1);
-            file.write((byte)0x00);
+            FileChannel channel = file.getChannel();
+            channel.position(0);
+            channel.write(journalEof);
+            channel.position(maxFileLength - 5);
+            journalEof.rewind();
+            channel.write(journalEof);
+            channel.force(false);
+            channel.position(0);
+        } catch (ClosedByInterruptException ignored) {
+            LOG.trace("Could not preallocate journal file with sparse file", ignored);
         } catch (IOException e) {
-            LOG.error("Could not preallocate journal file with sparse file! Will continue without preallocation", e);
+            LOG.error("Could not preallocate journal file with sparse file", e);
         }
     }
 
     private void doPreallocationZeros(RecoverableRandomAccessFile file) {
         ByteBuffer buffer = ByteBuffer.allocate(maxFileLength);
-
+        buffer.put(EOF_RECORD);
+        buffer.rewind();
         try {
             FileChannel channel = file.getChannel();
             channel.write(buffer);
             channel.force(false);
             channel.position(0);
+        } catch (ClosedByInterruptException ignored) {
+            LOG.trace("Could not preallocate journal file with zeros", ignored);
         } catch (IOException e) {
-            LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e);
+            LOG.error("Could not preallocate journal file with zeros", e);
         }
     }
 
     private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) {
-        // create a template file that will be used to pre-allocate the journal files
-        File templateFile = createJournalTemplateFile();
-
-        RandomAccessFile templateRaf = null;
         try {
-            templateRaf = new RandomAccessFile(templateFile, "rw");
-            templateRaf.setLength(maxFileLength);
-            templateRaf.getChannel().force(true);
+            RandomAccessFile templateRaf = new RandomAccessFile(osKernelCopyTemplateFile, "rw");
             templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel());
             templateRaf.close();
-            templateFile.delete();
+        } catch (ClosedByInterruptException ignored) {
+            LOG.trace("Could not preallocate journal file with kernel copy", ignored);
         } catch (FileNotFoundException e) {
-            LOG.error("Could not find the template file on disk at " + templateFile.getAbsolutePath(), e);
+            LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e);
         } catch (IOException e) {
-            LOG.error("Could not transfer the template file to journal, transferFile=" + templateFile.getAbsolutePath(), e);
+            LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e);
         }
     }
 
     private File createJournalTemplateFile() {
         String fileName = "db-log.template";
         File rc = new File(directory, fileName);
-        if (rc.exists()) {
-            LOG.trace("deleting journal template file because it already exists...");
-            rc.delete();
+        try (RandomAccessFile templateRaf = new RandomAccessFile(rc, "rw");) {
+            templateRaf.getChannel().write(ByteBuffer.wrap(EOF_RECORD));
+            templateRaf.setLength(maxFileLength);
+            templateRaf.getChannel().force(true);
+        } catch (FileNotFoundException e) {
+            LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e);
+        } catch (IOException e) {
+            LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e);
         }
         return rc;
     }
@@ -325,6 +390,8 @@ public class Journal {
     private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) {
 
         ByteBuffer buffer = ByteBuffer.allocate(PREALLOC_CHUNK_SIZE);
+        buffer.put(EOF_RECORD);
+        buffer.rewind();
 
         try {
             FileChannel channel = file.getChannel();
@@ -354,6 +421,24 @@ public class Journal {
         }
     }
 
+    public boolean isUnusedPreallocated(DataFile dataFile) throws IOException {
+        int firstBatchRecordSize = -1;
+        if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) {
+            Location location = new Location();
+            location.setDataFileId(dataFile.getDataFileId());
+            location.setOffset(0);
+
+            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+            try {
+                firstBatchRecordSize = checkBatchRecord(reader, location.getOffset());
+            } catch (Exception ignored) {
+            } finally {
+                accessorPool.closeDataFileAccessor(reader);
+            }
+        }
+        return firstBatchRecordSize == 0;
+    }
+
     protected Location recoveryCheck(DataFile dataFile) throws IOException {
         Location location = new Location();
         location.setDataFileId(dataFile.getDataFileId());
@@ -364,6 +449,10 @@ public class Journal {
             while (true) {
                 int size = checkBatchRecord(reader, location.getOffset());
                 if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) {
+                    if (size == 0) {
+                        // eof batch record
+                        break;
+                    }
                     location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size);
                 } else {
 
@@ -433,6 +522,12 @@ public class Journal {
 
             reader.readFully(offset, controlRecord);
 
+            // check for journal eof
+            if (Arrays.equals(EOF_RECORD, Arrays.copyOfRange(controlRecord, 0, EOF_RECORD.length))) {
+                // eof batch
+                return 0;
+            }
+
             // Assert that it's a batch record.
             for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) {
                 if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) {
@@ -476,42 +571,67 @@ public class Journal {
         return totalLength.get();
     }
 
-    synchronized DataFile getOrCreateCurrentWriteFile() throws IOException {
-        if (dataFiles.isEmpty()) {
-            rotateWriteFile();
+    private void rotateWriteFile() throws IOException {
+       synchronized (dataFileIdLock) {
+            DataFile dataFile = nextDataFile;
+            if (dataFile == null) {
+                dataFile = newDataFile();
+            }
+            synchronized (currentDataFile) {
+                fileMap.put(dataFile.getDataFileId(), dataFile);
+                fileByFileMap.put(dataFile.getFile(), dataFile);
+                dataFiles.addLast(dataFile);
+                currentDataFile.set(dataFile);
+            }
+            nextDataFile = null;
         }
-
-        DataFile current = dataFiles.getTail();
-
-        if (current != null) {
-            return current;
-        } else {
-            return rotateWriteFile();
+        if (PreallocationScope.ENTIRE_JOURNAL_ASYNC == preallocationScope) {
+            preAllocateNextDataFileFuture = scheduler.submit(preAllocateNextDataFileTask);
         }
     }
 
-    synchronized DataFile rotateWriteFile() {
+    private Runnable preAllocateNextDataFileTask = new Runnable() {
+        @Override
+        public void run() {
+            if (nextDataFile == null) {
+                synchronized (dataFileIdLock){
+                    try {
+                        nextDataFile = newDataFile();
+                    } catch (IOException e) {
+                        LOG.warn("Failed to proactively allocate data file", e);
+                    }
+                }
+            }
+        }
+    };
+
+    private volatile Future preAllocateNextDataFileFuture;
+
+    private DataFile newDataFile() throws IOException {
         int nextNum = nextDataFileId++;
         File file = getFile(nextNum);
         DataFile nextWriteFile = new DataFile(file, nextNum);
-        fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
-        fileByFileMap.put(file, nextWriteFile);
-        dataFiles.addLast(nextWriteFile);
+        preallocateEntireJournalDataFile(nextWriteFile.appendRandomAccessFile());
         return nextWriteFile;
     }
 
-    public synchronized DataFile reserveDataFile() {
-        int nextNum = nextDataFileId++;
-        File file = getFile(nextNum);
-        DataFile reservedDataFile = new DataFile(file, nextNum);
-        fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile);
-        fileByFileMap.put(file, reservedDataFile);
-        if (dataFiles.isEmpty()) {
-            dataFiles.addLast(reservedDataFile);
-        } else {
-            dataFiles.getTail().linkBefore(reservedDataFile);
+
+    public DataFile reserveDataFile() {
+        synchronized (dataFileIdLock) {
+            int nextNum = nextDataFileId++;
+            File file = getFile(nextNum);
+            DataFile reservedDataFile = new DataFile(file, nextNum);
+            synchronized (currentDataFile) {
+                fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile);
+                fileByFileMap.put(file, reservedDataFile);
+                if (dataFiles.isEmpty()) {
+                    dataFiles.addLast(reservedDataFile);
+                } else {
+                    dataFiles.getTail().linkBefore(reservedDataFile);
+                }
+            }
+            return reservedDataFile;
         }
-        return reservedDataFile;
     }
 
     public File getFile(int nextNum) {
@@ -520,24 +640,17 @@ public class Journal {
         return file;
     }
 
-    synchronized DataFile getDataFile(Location item) throws IOException {
+    DataFile getDataFile(Location item) throws IOException {
         Integer key = Integer.valueOf(item.getDataFileId());
-        DataFile dataFile = fileMap.get(key);
-        if (dataFile == null) {
-            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
-            throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
+        DataFile dataFile = null;
+        synchronized (currentDataFile) {
+            dataFile = fileMap.get(key);
         }
-        return dataFile;
-    }
-
-    synchronized File getFile(Location item) throws IOException {
-        Integer key = Integer.valueOf(item.getDataFileId());
-        DataFile dataFile = fileMap.get(key);
         if (dataFile == null) {
             LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
             throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
         }
-        return dataFile.getFile();
+        return dataFile;
     }
 
     public void close() throws IOException {
@@ -545,14 +658,16 @@ public class Journal {
             if (!started) {
                 return;
             }
-            if (this.timer != null) {
-                this.timer.cancel();
+            cleanupTask.cancel(true);
+            if (preAllocateNextDataFileFuture != null) {
+                preAllocateNextDataFileFuture.cancel(true);
             }
+            ThreadPoolUtils.shutdownGraceful(scheduler, 4000);
             accessorPool.close();
         }
         // the appender can be calling back to to the journal blocking a close AMQ-5620
         appender.close();
-        synchronized (this) {
+        synchronized (currentDataFile) {
             fileMap.clear();
             fileByFileMap.clear();
             dataFiles.clear();
@@ -579,37 +694,52 @@ public class Journal {
             result &= dataFile.delete();
         }
 
-        totalLength.set(0);
-        fileMap.clear();
-        fileByFileMap.clear();
-        lastAppendLocation.set(null);
-        dataFiles = new LinkedNodeList<DataFile>();
+        if (preAllocateNextDataFileFuture != null) {
+            preAllocateNextDataFileFuture.cancel(true);
+        }
+        synchronized (dataFileIdLock) {
+            if (nextDataFile != null) {
+                nextDataFile.delete();
+                nextDataFile = null;
+            }
+        }
 
+        totalLength.set(0);
+        synchronized (currentDataFile) {
+            fileMap.clear();
+            fileByFileMap.clear();
+            lastAppendLocation.set(null);
+            dataFiles = new LinkedNodeList<DataFile>();
+        }
         // reopen open file handles...
         accessorPool = new DataFileAccessorPool(this);
         appender = new DataFileAppender(this);
         return result;
     }
 
-    public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
+    public void removeDataFiles(Set<Integer> files) throws IOException {
         for (Integer key : files) {
             // Can't remove the data file (or subsequent files) that is currently being written to.
             if (key >= lastAppendLocation.get().getDataFileId()) {
                 continue;
             }
-            DataFile dataFile = fileMap.get(key);
+            DataFile dataFile = null;
+            synchronized (currentDataFile) {
+                dataFile = fileMap.remove(key);
+                if (dataFile != null) {
+                    fileByFileMap.remove(dataFile.getFile());
+                    dataFile.unlink();
+                }
+            }
             if (dataFile != null) {
                 forceRemoveDataFile(dataFile);
             }
         }
     }
 
-    private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
+    private void forceRemoveDataFile(DataFile dataFile) throws IOException {
         accessorPool.disposeDataFileAccessors(dataFile);
-        fileByFileMap.remove(dataFile.getFile());
-        fileMap.remove(dataFile.getDataFileId());
         totalLength.addAndGet(-dataFile.getLength());
-        dataFile.unlink();
         if (archiveDataLogs) {
             File directoryArchive = getDirectoryArchive();
             if (directoryArchive.exists()) {
@@ -657,13 +787,15 @@ public class Journal {
         return directory.toString();
     }
 
-    public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
-
+    public Location getNextLocation(Location location) throws IOException, IllegalStateException {
         Location cur = null;
         while (true) {
             if (cur == null) {
                 if (location == null) {
-                    DataFile head = dataFiles.getHead();
+                    DataFile head = null;
+                    synchronized (currentDataFile) {
+                        head = dataFiles.getHead();
+                    }
                     if (head == null) {
                         return null;
                     }
@@ -687,7 +819,9 @@ public class Journal {
 
             // Did it go into the next file??
             if (dataFile.getLength() <= cur.getOffset()) {
-                dataFile = dataFile.getNext();
+                synchronized (currentDataFile) {
+                    dataFile = dataFile.getNext();
+                }
                 if (dataFile == null) {
                     return null;
                 } else {
@@ -708,9 +842,14 @@ public class Journal {
             if (corruptedRange != null) {
                 // skip corruption
                 cur.setSize((int) corruptedRange.range());
-            } else if (cur.getType() == 0) {
+            } else if (cur.getSize() == EOF_INT && cur.getType() == EOF_EOT ||
+                    (cur.getType() == 0 && cur.getSize() == 0)) {
                 // eof - jump to next datafile
-                cur.setOffset(maxFileLength);
+                // EOF_INT and EOF_EOT replace 0,0 - we need to react to both for
+                // replay of existing journals
+                // possibly journal is larger than maxFileLength after config change
+                cur.setSize(EOF_RECORD.length);
+                cur.setOffset(Math.max(maxFileLength, dataFile.getLength()));
             } else if (cur.getType() == USER_RECORD_TYPE) {
                 // Only return user records.
                 return cur;
@@ -718,7 +857,7 @@ public class Journal {
         }
     }
 
-    public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
+    public ByteSequence read(Location location) throws IOException, IllegalStateException {
         DataFile dataFile = getDataFile(location);
         DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
         ByteSequence rc = null;
@@ -816,34 +955,24 @@ public class Journal {
         this.archiveDataLogs = archiveDataLogs;
     }
 
-    public synchronized DataFile getDataFileById(int dataFileId) {
-        if (dataFiles.isEmpty()) {
-            return null;
+    public DataFile getDataFileById(int dataFileId) {
+        synchronized (currentDataFile) {
+            return fileMap.get(Integer.valueOf(dataFileId));
         }
-
-        return fileMap.get(Integer.valueOf(dataFileId));
     }
 
-    public synchronized DataFile getCurrentDataFile() {
-        if (dataFiles.isEmpty()) {
-            return null;
-        }
-
-        DataFile current = dataFiles.getTail();
-
-        if (current != null) {
-            return current;
-        } else {
-            return null;
+    public DataFile getCurrentDataFile(int capacity) throws IOException {
+        synchronized (currentDataFile) {
+            if (currentDataFile.get().getLength() + capacity >= maxFileLength) {
+                rotateWriteFile();
+            }
+            return currentDataFile.get();
         }
     }
 
-    public synchronized Integer getCurrentDataFileId() {
-        DataFile current = getCurrentDataFile();
-        if (current != null) {
-            return current.getDataFileId();
-        } else {
-            return null;
+    public Integer getCurrentDataFileId() {
+        synchronized (currentDataFile) {
+            return currentDataFile.get().getDataFileId();
         }
     }
 
@@ -853,11 +982,15 @@ public class Journal {
      * @return files currently being used
      */
     public Set<File> getFiles() {
-        return fileByFileMap.keySet();
+        synchronized (currentDataFile) {
+            return fileByFileMap.keySet();
+        }
     }
 
-    public synchronized Map<Integer, DataFile> getFileMap() {
-        return new TreeMap<Integer, DataFile>(fileMap);
+    public Map<Integer, DataFile> getFileMap() {
+        synchronized (currentDataFile) {
+            return new TreeMap<Integer, DataFile>(fileMap);
+        }
     }
 
     public long getDiskSize() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java
index 3e3e090..a80328f 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java
@@ -227,20 +227,18 @@ public class TargetedDataFileAppender implements FileAppender {
             }
 
             // append 'unset' next batch (5 bytes) so read can always find eof
-            buff.writeInt(0);
-            buff.writeByte(0);
-
+            buff.write(Journal.EOF_RECORD);
             ByteSequence sequence = buff.toByteSequence();
 
             // Now we can fill in the batch control record properly.
             buff.reset();
             buff.skip(5 + Journal.BATCH_CONTROL_RECORD_MAGIC.length);
-            buff.writeInt(sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5);
+            buff.writeInt(sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - Journal.EOF_RECORD.length);
             if (journal.isChecksum()) {
                 Checksum checksum = new Adler32();
                 checksum.update(sequence.getData(),
                                 sequence.getOffset() + Journal.BATCH_CONTROL_RECORD_SIZE,
-                                sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5);
+                                sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - Journal.EOF_RECORD.length);
                 buff.writeLong(checksum.getValue());
             }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
index cf60a08..faf0022 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
@@ -128,8 +128,8 @@ public class JournalCorruptionEofIndexRecoveryTest {
         adapter.setCheckForCorruptJournalFiles(true);
         adapter.setIgnoreMissingJournalfiles(ignoreMissingJournalFiles);
 
-        adapter.setPreallocationStrategy("zeros");
-        adapter.setPreallocationScope("entire_journal");
+        adapter.setPreallocationStrategy(Journal.PreallocationStrategy.ZEROS.name());
+        adapter.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name());
     }
 
     @After
@@ -259,6 +259,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
         corruptOrderIndex(id, size);
 
         randomAccessFile.getChannel().force(true);
+        dataFile.closeRandomAccessFile(randomAccessFile);
     }
 
     private void corruptBatchEndEof(int id) throws Exception{

http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java
index 84c2ab5..2e34686 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java
@@ -114,6 +114,8 @@ public class JournalCorruptionIndexRecoveryTest {
 
         adapter.setCheckForCorruptJournalFiles(true);
         adapter.setIgnoreMissingJournalfiles(true);
+
+        adapter.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name());
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java
index 3c65814..987c2d3 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.store.kahadb.disk.journal;
 
 import java.io.File;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;

http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalLatencyTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalLatencyTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalLatencyTest.java
index d6c64f4..32e2f0c 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalLatencyTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalLatencyTest.java
@@ -39,10 +39,13 @@ public class PreallocationJournalLatencyTest {
 
         TimeStatisticImpl sparse = executeTest(Journal.PreallocationStrategy.SPARSE_FILE.name());
         TimeStatisticImpl chunked_zeros = executeTest(Journal.PreallocationStrategy.CHUNKED_ZEROS.name());
-        TimeStatisticImpl zeros = executeTest(Journal.PreallocationStrategy.ZEROS.name());
+        //TimeStatisticImpl zeros = executeTest(Journal.PreallocationStrategy.ZEROS.name());
+        TimeStatisticImpl kernel = executeTest(Journal.PreallocationStrategy.OS_KERNEL_COPY.name());
+
         LOG.info("  sparse: " + sparse);
         LOG.info(" chunked: " + chunked_zeros);
-        LOG.info("   zeros: " + zeros);
+        //LOG.info("   zeros: " + zeros);
+        LOG.info("  kernel: " + kernel);
 
     }
 
@@ -50,11 +53,13 @@ public class PreallocationJournalLatencyTest {
         int randInt = rand.nextInt(100);
         File dataDirectory = new File("./target/activemq-data/kahadb" + randInt);
 
-        KahaDBStore store = new KahaDBStore();
-        store.setJournalMaxFileLength(16*1204*1024);
+        final KahaDBStore store = new KahaDBStore();
+        store.setCheckpointInterval(5000);
+        store.setJournalMaxFileLength(32*1204*1024);
         store.deleteAllMessages();
         store.setDirectory(dataDirectory);
         store.setPreallocationStrategy(preallocationStrategy);
+        store.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name());
         store.start();
 
         final File journalLog = new File(dataDirectory, "db-1.log");
@@ -66,7 +71,7 @@ public class PreallocationJournalLatencyTest {
         }));
 
         final Journal journal = store.getJournal();
-        ByteSequence byteSequence = new ByteSequence(new byte[8*1024]);
+        ByteSequence byteSequence = new ByteSequence(new byte[16*1024]);
 
         TimeStatisticImpl timeStatistic = new TimeStatisticImpl("append", "duration");
         for (int i=0;i<5000; i++) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java
index a6cdb9e..bbdcde7 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.util.ByteSequence;

http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
index 3e41dc9..087b9be 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
@@ -41,6 +41,7 @@ import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -224,6 +225,7 @@ public class AMQ2584ConcurrentDlqTest extends org.apache.activemq.TestSupport {
         properties.put("maxFileLength", maxFileLengthVal);
         properties.put("cleanupInterval", "2000");
         properties.put("checkpointInterval", "2000");
+        properties.put("preallocationScope", Journal.PreallocationScope.ENTIRE_JOURNAL.name());
         // there are problems with duplicate dispatch in the cursor, which maintain
         // a map of messages. A dup dispatch can be dropped.
         // see: org.apache.activemq.broker.region.cursors.OrderedPendingList

http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
index 6494efe..58e27f8 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
@@ -31,6 +31,7 @@ import org.junit.Test;
 import javax.jms.*;
 
 import java.io.File;
+import java.util.Arrays;
 
 import static org.junit.Assert.assertEquals;
 
@@ -101,6 +102,7 @@ public class AMQ3120Test {
     private int getFileCount(File dir){
         if (dir.isDirectory()) {
             String[] children = dir.list();
+            LOG.info("Children: " + Arrays.asList(children));
             return children.length;
         }
 
@@ -112,7 +114,7 @@ public class AMQ3120Test {
         final int messageCount = 500;
         startBroker(true);
         int fileCount = getFileCount(kahaDbDir);
-        assertEquals(4, fileCount);
+        assertEquals(5, fileCount);
 
         Connection connection = new ActiveMQConnectionFactory(
                 broker.getTransportConnectors().get(0).getConnectUri()).createConnection();

http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
index e965731..5db7579 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
@@ -116,7 +116,7 @@ public class AMQ4323Test {
         final int messageCount = 500;
         startBroker(true);
         int fileCount = getFileCount(kahaDbDir);
-        assertEquals(4, fileCount);
+        assertEquals(5, fileCount);
 
         Connection connection = new ActiveMQConnectionFactory(
                 broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
@@ -149,7 +149,7 @@ public class AMQ4323Test {
             public boolean isSatisified() throws Exception {
                 int fileCount = getFileCount(kahaDbDir);
                 LOG.info("current filecount:" + fileCount);
-                return 4 == fileCount;
+                return 5 == fileCount;
             }
         }));
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java
index 4a23331..cf9522f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java
@@ -135,7 +135,7 @@ public class KahaDBIndexLocationTest {
 
         // Should contain the initial log for the journal and the lock.
         assertNotNull(journal);
-        assertEquals(2, journal.length);
+        assertEquals(3, journal.length);
     }
 
     @Test


Mime
View raw message