activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cepo...@apache.org
Subject activemq git commit: adding options for https://issues.apache.org/jira/browse/AMQ-5578 to allow configuring the allocation strategy at finer grained controls including zeroing out, OS copying, or sparse file
Date Thu, 19 Feb 2015 17:50:16 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 8858dc294 -> 45e59e6e8


adding options for https://issues.apache.org/jira/browse/AMQ-5578 to
allow configuring the allocation strategy at finer grained controls
including zeroing out, OS copying, or sparse file


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/45e59e6e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/45e59e6e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/45e59e6e

Branch: refs/heads/master
Commit: 45e59e6e839ae89ffc099d32a4180ee307543aae
Parents: 8858dc2
Author: Christian Posta <christian.posta@gmail.com>
Authored: Tue Feb 17 08:02:37 2015 -0700
Committer: Christian Posta <christian.posta@gmail.com>
Committed: Thu Feb 19 10:49:40 2015 -0700

----------------------------------------------------------------------
 .../store/kahadb/KahaDBPersistenceAdapter.java  |  16 +++
 .../activemq/store/kahadb/MessageDatabase.java  |  31 +++++
 .../store/kahadb/disk/journal/DataFile.java     |  44 +++++++
 .../kahadb/disk/journal/DataFileAppender.java   |   8 +-
 .../store/kahadb/disk/journal/Journal.java      | 132 +++++++++++++++++--
 .../store/kahadb/disk/util/DiskBenchmark.java   |  32 ++++-
 6 files changed, 248 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/45e59e6e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index 9b83a0e..ebe12f3 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -511,6 +511,22 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport
implements
         letter.setBrokerService(brokerService);
     }
 
+    public String getPreallocationScope() {
+        return letter.getPreallocationScope();
+    }
+
+    public void setPreallocationScope(String preallocationScope) {
+        this.letter.setPreallocationScope(preallocationScope);
+    }
+
+    public String getPreallocationStrategy() {
+        return letter.getPreallocationStrategy();
+    }
+
+    public void setPreallocationStrategy(String preallocationStrategy) {
+        this.letter.setPreallocationStrategy(preallocationStrategy);
+    }
+
     public boolean isArchiveDataLogs() {
         return letter.isArchiveDataLogs();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/45e59e6e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 477f42c..9fc29f4 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -237,8 +237,11 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
     long cleanupInterval = 30*1000;
     int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
     int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
+    int preallocationBatchSize = Journal.DEFAULT_PREALLOCATION_BATCH_SIZE;
     boolean enableIndexWriteAsync = false;
     int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
+    private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
+    private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
 
     protected AtomicBoolean opened = new AtomicBoolean();
     private boolean ignoreMissingJournalfiles = false;
@@ -2487,6 +2490,10 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
         manager.setArchiveDataLogs(isArchiveDataLogs());
         manager.setSizeAccumulator(journalSize);
         manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
+        manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
+        manager.setPreallocationStrategy(
+                Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
+        manager.setPreallocationBatchSize(preallocationBatchSize);
         if (getDirectoryArchive() != null) {
             IOHelper.mkdirs(getDirectoryArchive());
             manager.setDirectoryArchive(getDirectoryArchive());
@@ -3175,4 +3182,28 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
     interface IndexAware {
         public void sequenceAssignedWithIndexLocked(long index);
     }
+
+    public String getPreallocationScope() {
+        return preallocationScope;
+    }
+
+    public void setPreallocationScope(String preallocationScope) {
+        this.preallocationScope = preallocationScope;
+    }
+
+    public String getPreallocationStrategy() {
+        return preallocationStrategy;
+    }
+
+    public void setPreallocationStrategy(String preallocationStrategy) {
+        this.preallocationStrategy = preallocationStrategy;
+    }
+
+    public int getPreallocationBatchSize() {
+        return preallocationBatchSize;
+    }
+
+    public void setPreallocationBatchSize(int preallocationBatchSize) {
+        this.preallocationBatchSize = preallocationBatchSize;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/45e59e6e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
index 1c5ee3a..ac35866 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
@@ -18,11 +18,16 @@ package org.apache.activemq.store.kahadb.disk.journal;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 
 import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
 import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.RecoverableRandomAccessFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * DataFile
@@ -31,10 +36,13 @@ import org.apache.activemq.util.RecoverableRandomAccessFile;
  */
 public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFile>
{
 
+    private static final Logger LOG = LoggerFactory.getLogger(DataFile.class);
+
     protected final File file;
     protected final Integer dataFileId;
     protected volatile int length;
     protected final SequenceSet corruptedBlocks = new SequenceSet();
+    protected long preallocationBatchWindow = 0L;
 
     DataFile(File file, int number) {
         this.file = file;
@@ -60,6 +68,7 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
 
     public synchronized void incrementLength(int size) {
         length += size;
+        preallocationBatchWindow -= size;
     }
 
     @Override
@@ -105,4 +114,39 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
     public int hashCode() {
         return dataFileId;
     }
+
+    public void preallocateJournalBatch(Journal journal, long newMessageSize) {
+
+        if (preallocationBatchWindow - newMessageSize <= 0) {
+            int preallocationBatchSize = Math.min(journal.getPreallocationBatchSize(),
+                    journal.maxFileLength - length);
+            doPreallocation(preallocationBatchSize);
+            preallocationBatchWindow = preallocationBatchSize;
+        }
+    }
+
+    private void doPreallocation(int size) {
+        try {
+            RecoverableRandomAccessFile file = openRandomAccessFile();
+            FileChannel channel = file.getChannel();
+
+            channel.position(length+1);
+            ByteBuffer buffer = generateAllocation(size);
+            channel.write(buffer);
+            channel.force(false);
+            file.close();
+        } catch (IOException e) {
+            LOG.debug("Cannot allocate batch for journal, continue without preallocation
of batch...");
+        }
+
+    }
+
+    private ByteBuffer generateAllocation(int size) {
+        ByteBuffer rc = ByteBuffer.allocate(size);
+        for (int i = 0; i < size; i++) {
+            rc.put((byte) 0x00);
+        }
+        rc.flip();
+        return rc;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/45e59e6e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index f5d7e10..45ae047 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -211,6 +211,11 @@ class DataFileAppender implements FileAppender {
                         file = journal.rotateWriteFile();
                     }
 
+                    // will do batch preallocation on the journal if configured
+                    if (journal.preallocationScope == Journal.PreallocationScope.BATCH) {
+                        file.preallocateJournalBatch(journal, write.location.getSize());
+                    }
+
                     nextWriteBatch = newWriteBatch(write, file);
                     enqueueMutex.notifyAll();
                     break;
@@ -314,8 +319,7 @@ class DataFileAppender implements FileAppender {
                     dataFile = wb.dataFile;
                     file = dataFile.openRandomAccessFile();
                     // pre allocate on first open
-                    file.seek(journal.maxFileLength-1);
-                    file.write(end);
+                    journal.preallocateEntireJournalDataFile(file);
                 }
 
                 Journal.WriteCommand write = wb.writes.getHead();

http://git-wip-us.apache.org/repos/asf/activemq/blob/45e59e6e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 5541e9f..221e929 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -16,10 +16,9 @@
  */
 package org.apache.activemq.store.kahadb.disk.journal;
 
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -27,12 +26,9 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
-import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.DataByteArrayInputStream;
-import org.apache.activemq.util.DataByteArrayOutputStream;
 import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
 import org.apache.activemq.store.kahadb.disk.util.SchedulerTimerTask;
 import org.apache.activemq.store.kahadb.disk.util.Sequence;
@@ -58,6 +54,17 @@ public class Journal {
     public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
     public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
 
+    public enum PreallocationStrategy {
+        SPARSE_FILE,
+        OS_KERNEL_COPY,
+        ZEROS;
+    }
+
+    public enum PreallocationScope {
+        BATCH,
+        ENTIRE_JOURNAL;
+    }
+
     private static byte[] createBatchControlRecordHeader() {
         try {
             DataByteArrayOutputStream os = new DataByteArrayOutputStream();
@@ -80,6 +87,7 @@ public class Journal {
     public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
     public static final int PREFERED_DIFF = 1024 * 512;
     public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
+    public static final int DEFAULT_PREALLOCATION_BATCH_SIZE = 1024 * 1024 * 1;
 
     private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
 
@@ -113,6 +121,10 @@ public class Journal {
     protected boolean enableAsyncDiskSync = true;
     private Timer timer;
 
+    protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
+    protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
+    protected int preallocationBatchSize = DEFAULT_PREALLOCATION_BATCH_SIZE;
+
     public interface DataFileRemovedListener {
         void fileRemoved(DataFile datafile);
     }
@@ -181,6 +193,7 @@ public class Journal {
             totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength);
         }
 
+
         cleanupTask = new Runnable() {
             public void run() {
                 cleanup();
@@ -193,6 +206,85 @@ public class Journal {
         LOG.trace("Startup took: "+(end-start)+" ms");
     }
 
+
+    public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) {
+
+        if (PreallocationScope.ENTIRE_JOURNAL == preallocationScope) {
+
+            if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
+                doPreallocationKernelCopy(file);
+
+            }else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
+                doPreallocationZeros(file);
+            }
+            else {
+                doPreallocationSparseFile(file);
+            }
+        }else {
+            LOG.info("Using journal preallocation scope of batch allocation");
+        }
+    }
+
+    private void doPreallocationSparseFile(RecoverableRandomAccessFile file) {
+        LOG.info("Preallocate journal file with sparse file");
+        try {
+            file.seek(maxFileLength - 1);
+            file.write((byte)0x00);
+        } catch (IOException e) {
+            LOG.error("Could not preallocate journal file with sparse file! Will continue
without preallocation", e);
+        }
+    }
+
+    private void doPreallocationZeros(RecoverableRandomAccessFile file) {
+        LOG.info("Preallocate journal file with zeros");
+        ByteBuffer buffer = ByteBuffer.allocate(maxFileLength);
+        for (int i = 0; i < maxFileLength; i++) {
+            buffer.put((byte) 0x00);
+        }
+        buffer.flip();
+
+        try {
+            FileChannel channel = file.getChannel();
+            channel.write(buffer);
+            channel.force(false);
+            channel.position(0);
+        } catch (IOException e) {
+            LOG.error("Could not preallocate journal file with zeros! Will continue without
preallocation", e);
+        }
+    }
+
+    private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) {
+        LOG.info("Preallocate journal file with kernel file copying");
+
+        // create a template file that will be used to pre-allocate the journal files
+        File templateFile = createJournalTemplateFile();
+
+        RandomAccessFile templateRaf = null;
+        try {
+            templateRaf = new RandomAccessFile(templateFile, "rw");
+            templateRaf.setLength(maxFileLength);
+            templateRaf.getChannel().force(true);
+            templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel());
+            templateRaf.close();
+            templateFile.delete();
+        } catch (FileNotFoundException e) {
+            LOG.error("Could not find the template file on disk at " + templateFile.getAbsolutePath(),
e);
+        } catch (IOException e) {
+            LOG.error("Could not transfer the template file to journal, transferFile=" +
templateFile.getAbsolutePath(), e);
+        }
+    }
+
+    private File createJournalTemplateFile() {
+        String fileName = "db-log.template";
+        File rc  = new File(directory, fileName);
+        if (rc.exists()) {
+            System.out.println("deleting file because it already exists...");
+            rc.delete();
+
+        }
+        return rc;
+    }
+
     private static byte[] bytes(String string) {
         try {
             return string.getBytes("UTF-8");
@@ -570,6 +662,30 @@ public class Journal {
         }
     }
 
+    public PreallocationStrategy getPreallocationStrategy() {
+        return preallocationStrategy;
+    }
+
+    public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) {
+        this.preallocationStrategy = preallocationStrategy;
+    }
+
+    public PreallocationScope getPreallocationScope() {
+        return preallocationScope;
+    }
+
+    public void setPreallocationScope(PreallocationScope preallocationScope) {
+        this.preallocationScope = preallocationScope;
+    }
+
+    public int getPreallocationBatchSize() {
+        return preallocationBatchSize;
+    }
+
+    public void setPreallocationBatchSize(int preallocationBatchSize) {
+        this.preallocationBatchSize = preallocationBatchSize;
+    }
+
     public File getDirectory() {
         return directory;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/45e59e6e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java
index 641fe79..4cffa6b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.store.kahadb.disk.util;
 
+import org.apache.activemq.util.RecoverableRandomAccessFile;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -193,7 +195,7 @@ public class DiskBenchmark {
         }
     }
 
-    public Report benchmark(File file) throws IOException {
+    public Report benchmark(File file) throws Exception {
         Report rc = new Report();
 
         // Initialize the block we will be writing to disk.
@@ -203,8 +205,9 @@ public class DiskBenchmark {
         }
 
         rc.size = data.length;
-        RandomAccessFile raf = new RandomAccessFile(file, "rw");
-        raf.setLength(size);
+        RecoverableRandomAccessFile raf = new RecoverableRandomAccessFile(file, "rw");
+//        RandomAccessFile raf = new RandomAccessFile(file, "rw");
+        preallocateDataFile(raf, file.getParentFile());
 
         // Figure out how many writes we can do in the sample interval.
         long start = System.currentTimeMillis();
@@ -235,7 +238,7 @@ public class DiskBenchmark {
         rc.writes = ioCount;
         rc.writeDuration = (now - start);
 
-        raf = new RandomAccessFile(file, "rw");
+        raf = new RecoverableRandomAccessFile(file, "rw");
         start = System.currentTimeMillis();
         now = System.currentTimeMillis();
         ioCount = 0;
@@ -259,7 +262,7 @@ public class DiskBenchmark {
         rc.syncWrites = ioCount;
         rc.syncWriteDuration = (now - start);
 
-        raf = new RandomAccessFile(file, "rw");
+        raf = new RecoverableRandomAccessFile(file, "rw");
         start = System.currentTimeMillis();
         now = System.currentTimeMillis();
         ioCount = 0;
@@ -285,6 +288,25 @@ public class DiskBenchmark {
         return rc;
     }
 
+    private void preallocateDataFile(RecoverableRandomAccessFile raf, File location) throws
Exception {
+        File tmpFile;
+        if (location != null && location.isDirectory()) {
+            tmpFile = new File(location, "template.dat");
+        }else {
+            tmpFile = new File("template.dat");
+        }
+        if (tmpFile.exists()) {
+            tmpFile.delete();
+        }
+        System.out.println("Using a template file: " + tmpFile.getAbsolutePath());
+        RandomAccessFile templateFile = new RandomAccessFile(tmpFile, "rw");
+        templateFile.setLength(size);
+        templateFile.getChannel().force(true);
+        templateFile.getChannel().transferTo(0, size, raf.getChannel());
+        templateFile.close();
+        tmpFile.delete();
+    }
+
     public boolean isVerbose() {
         return verbose;
     }


Mime
View raw message