activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1222471 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq-core/src/test/java/org/apache/activemq/store/kahadb/ kahadb/src/main/java/org/apache/kahadb/journal/ kahadb/src/main/java/org/apache/kahad...
Date Thu, 22 Dec 2011 21:44:01 GMT
Author: gtully
Date: Thu Dec 22 21:44:01 2011
New Revision: 1222471

URL: http://svn.apache.org/viewvc?rev=1222471&view=rev
Log:
add experimental appender that takes the buffering burden from the writer thread, it and some
trace enabled via -Dorg.apache.kahadb.journal.appender.WRITE_STAT_WINDOW=10000 -Dorg.apache.kahadb.journal.CALLER_BUFFER_APPENDER=true.
Additional accessors on KahaDb to further configure index for the fast but may need recovery
case

Added:
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java
      - copied, changed from r1222219, activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java   (with
props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1222471&r1=1222470&r2=1222471&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
Thu Dec 22 21:44:01 2011
@@ -537,6 +537,30 @@ public class KahaDBPersistenceAdapter im
         letter.setUseIndexLFRUEviction(useIndexLFRUEviction);
     }
 
+    public void setEnableIndexDiskSyncs(boolean diskSyncs) {
+        letter.setEnableIndexDiskSyncs(diskSyncs);
+    }
+
+    public boolean isEnableIndexDiskSyncs() {
+        return letter.isEnableIndexDiskSyncs();
+    }
+
+    public void setEnableIndexRecoveryFile(boolean enable) {
+        letter.setEnableIndexRecoveryFile(enable);
+    }
+
+    public boolean  isEnableIndexRecoveryFile() {
+        return letter.isEnableIndexRecoveryFile();
+    }
+
+    public void setEnableIndexPageCaching(boolean enable) {
+        letter.setEnableIndexPageCaching(enable);
+    }
+
+    public boolean isEnableIndexPageCaching() {
+        return isEnableIndexPageCaching();
+    }
+
     public KahaDBStore getStore() {
         return letter;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1222471&r1=1222470&r2=1222471&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Thu Dec 22 21:44:01 2011
@@ -184,6 +184,9 @@ public abstract class MessageDatabase ex
     private boolean archiveCorruptedIndex = false;
     private boolean useIndexLFRUEviction = false;
     private float indexLFUEvictionFactor = 0.2f;
+    private boolean enableIndexDiskSyncs = true;
+    private boolean enableIndexRecoveryFile = true;
+    private boolean enableIndexPageCaching = true;
 
     public MessageDatabase() {
     }
@@ -2058,6 +2061,9 @@ public abstract class MessageDatabase ex
         index.setPageCacheSize(indexCacheSize);
         index.setUseLFRUEviction(isUseIndexLFRUEviction());
         index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
+        index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
+        index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
+        index.setEnablePageCaching(isEnableIndexPageCaching());
         return index;
     }
 
@@ -2297,6 +2303,30 @@ public abstract class MessageDatabase ex
         this.useIndexLFRUEviction = useIndexLFRUEviction;
     }
 
+    public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
+        this.enableIndexDiskSyncs = enableIndexDiskSyncs;
+    }
+
+    public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
+        this.enableIndexRecoveryFile = enableIndexRecoveryFile;
+    }
+
+    public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
+        this.enableIndexPageCaching = enableIndexPageCaching;
+    }
+
+    public boolean isEnableIndexDiskSyncs() {
+        return enableIndexDiskSyncs;
+    }
+
+    public boolean isEnableIndexRecoveryFile() {
+        return enableIndexRecoveryFile;
+    }
+
+    public boolean isEnableIndexPageCaching() {
+        return enableIndexPageCaching;
+    }
+
     // /////////////////////////////////////////////////////////////////
     // Internal conversion methods.
     // /////////////////////////////////////////////////////////////////

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java?rev=1222471&r1=1222470&r2=1222471&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
Thu Dec 22 21:44:01 2011
@@ -35,6 +35,7 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.command.ConnectionControl;
 import org.junit.After;
 import org.junit.Ignore;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,7 +52,7 @@ public class KahaDBFastEnqueueTest {
     private boolean useBytesMessage= true;
     private final int parallelProducer = 2;
     private Vector<Exception> exceptions = new Vector<Exception>();
-    final long toSend = 500000;
+    final long toSend = 1000;//500000;
 
     @Ignore("not ready yet, exploring getting broker disk bound")
     public void testPublishNoConsumer() throws Exception {
@@ -91,8 +92,8 @@ public class KahaDBFastEnqueueTest {
         System.out.println("Journal writes %:    " + kahaDBPersistenceAdapter.getStore().getJournal().length()
/ (double)totalSent * 100 + "%");
         //System.out.println("Index writes %:       " + kahaDBPersistenceAdapter.getStore().getPageFile().totalWritten
/ (double)totalSent * 100 + "%");
 
-        //restartBroker(0);
-        //consumeMessages(toSend);
+        restartBroker(0);
+        consumeMessages(toSend);
     }
 
     private void consumeMessages(long count) throws Exception {
@@ -158,11 +159,13 @@ public class KahaDBFastEnqueueTest {
         kahaDBPersistenceAdapter.setCheckpointInterval(20 * 60 * 1000);
 
         // optimise for disk best batch rate
-        //kahaDBPersistenceAdapter.setJournalMaxWriteBatchSize(128*1024); //4mb default
-        kahaDBPersistenceAdapter.setJournalMaxFileLength(1024*1024*1024); // 32mb default
+        kahaDBPersistenceAdapter.setJournalMaxWriteBatchSize(24*1024*1024); //4mb default
+        kahaDBPersistenceAdapter.setJournalMaxFileLength(128*1024*1024); // 32mb default
         // keep index in memory
         kahaDBPersistenceAdapter.setIndexCacheSize(500000);
         kahaDBPersistenceAdapter.setIndexWriteBatchSize(500000);
+        kahaDBPersistenceAdapter.setEnableIndexRecoveryFile(false);
+        kahaDBPersistenceAdapter.setEnableIndexDiskSyncs(false);
 
         broker.setUseJmx(false);
         broker.addConnector("tcp://0.0.0.0:0");

Copied: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java
(from r1222219, activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java?p2=activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java&p1=activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java&r1=1222219&r2=1222471&rev=1222471&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java
Thu Dec 22 21:44:01 2011
@@ -21,26 +21,26 @@ import java.io.InterruptedIOException;
 import java.io.RandomAccessFile;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
-
 import org.apache.kahadb.util.ByteSequence;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.LinkedNode;
 import org.apache.kahadb.util.LinkedNodeList;
 
 /**
  * An optimized writer to do batch appends to a data file. This object is thread
  * safe and gains throughput as you increase the number of concurrent writes it
  * does.
- * 
+ * The thread calling enqueue does the file open and buffering of the data, which
+ * reduces the round trip of the write thread.
  * 
  */
-class DataFileAppender {
+class CallerBufferingDataFileAppender implements FileAppender {
 
     protected final Journal journal;
-    protected final Map<WriteKey, WriteCommand> inflightWrites;
+    protected final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
     protected final Object enqueueMutex = new Object() {
     };
     protected WriteBatch nextWriteBatch;
@@ -53,51 +53,34 @@ class DataFileAppender {
     private boolean running;
     private Thread thread;
 
-    public static class WriteKey {
-        private final int file;
-        private final long offset;
-        private final int hash;
-
-        public WriteKey(Location item) {
-            file = item.getDataFileId();
-            offset = item.getOffset();
-            // TODO: see if we can build a better hash
-            hash = (int)(file ^ offset);
-        }
-
-        public int hashCode() {
-            return hash;
-        }
-
-        public boolean equals(Object obj) {
-            if (obj instanceof WriteKey) {
-                WriteKey di = (WriteKey)obj;
-                return di.file == file && di.offset == offset;
-            }
-            return false;
-        }
-    }
-
+    final DataByteArrayOutputStream cachedBuffers[] = new DataByteArrayOutputStream[] {
+            new DataByteArrayOutputStream(maxWriteBatchSize),
+            new DataByteArrayOutputStream(maxWriteBatchSize)
+    };
+    AtomicInteger writeBatchInstanceCount = new AtomicInteger();
     public class WriteBatch {
 
+        DataByteArrayOutputStream buff = cachedBuffers[writeBatchInstanceCount.getAndIncrement()%2];
         public final DataFile dataFile;
 
-        public final LinkedNodeList<WriteCommand> writes = new LinkedNodeList<WriteCommand>();
+        public final LinkedNodeList<Journal.WriteCommand> writes = new LinkedNodeList<Journal.WriteCommand>();
         public final CountDownLatch latch = new CountDownLatch(1);
 		private final int offset;
         public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
         public AtomicReference<IOException> exception = new AtomicReference<IOException>();
+        public boolean forceToDisk;
 
-        public WriteBatch(DataFile dataFile, int offset, WriteCommand write) throws IOException
{
+        public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws
IOException {
             this.dataFile = dataFile;
 			this.offset = offset;
             this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
             this.size=Journal.BATCH_CONTROL_RECORD_SIZE;
             journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
+            initBuffer(buff);
             append(write);
         }
 
-        public boolean canAppend(WriteCommand write) {
+        public boolean canAppend(Journal.WriteCommand write) {
             int newSize = size + write.location.getSize();
 			if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength()
) {
                 return false;
@@ -105,7 +88,7 @@ class DataFileAppender {
             return true;
         }
 
-        public void append(WriteCommand write) throws IOException {
+        public void append(Journal.WriteCommand write) throws IOException {
             this.writes.addLast(write);
             write.location.setDataFileId(dataFile.getDataFileId());
             write.location.setOffset(offset+size);
@@ -113,34 +96,22 @@ class DataFileAppender {
 			size += s;
             dataFile.incrementLength(s);
             journal.addToTotalLength(s);
+            forceToDisk |= appendToBuffer(write, buff);
         }
     }
 
-    public static class WriteCommand extends LinkedNode<WriteCommand> {
-        public final Location location;
-        public final ByteSequence data;
-        final boolean sync;
-        public final Runnable onComplete;
-
-        public WriteCommand(Location location, ByteSequence data, boolean sync) {
-            this.location = location;
-            this.data = data;
-            this.sync = sync;
-            this.onComplete = null;
-        }
-
-        public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
-            this.location = location;
-            this.data = data;
-            this.onComplete = onComplete;
-            this.sync = false;
-        }
+    private void initBuffer(DataByteArrayOutputStream buff) throws IOException {
+        // Write an empty batch control record.
+        buff.reset();
+        buff.write(Journal.BATCH_CONTROL_RECORD_HEADER);
+        buff.writeInt(0);
+        buff.writeLong(0);
     }
 
     /**
      * Construct a Store writer
      */
-    public DataFileAppender(Journal dataManager) {
+    public CallerBufferingDataFileAppender(Journal dataManager) {
         this.journal = dataManager;
         this.inflightWrites = this.journal.getInflightWrites();
         this.maxWriteBatchSize = this.journal.getWriteBatchSize();
@@ -155,7 +126,7 @@ class DataFileAppender {
         location.setSize(size);
         location.setType(type);
 
-        WriteCommand write = new WriteCommand(location, data, sync);
+        Journal.WriteCommand write = new Journal.WriteCommand(location, data, sync);
 
         WriteBatch batch = enqueue(write);
         location.setLatch(batch.latch);
@@ -182,7 +153,7 @@ class DataFileAppender {
         location.setSize(size);
         location.setType(type);
 
-        WriteCommand write = new WriteCommand(location, data, onComplete);
+        Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete);
 
         WriteBatch batch = enqueue(write);
  
@@ -190,7 +161,7 @@ class DataFileAppender {
         return location;
     }
 
-    private WriteBatch enqueue(WriteCommand write) throws IOException {
+    private WriteBatch enqueue(Journal.WriteCommand write) throws IOException {
         synchronized (enqueueMutex) {
             if (shutdown) {
                 throw new IOException("Async Writter Thread Shutdown");
@@ -220,7 +191,7 @@ class DataFileAppender {
 	            	if( file.getLength() > journal.getMaxFileLength() ) {
 	            		file = journal.rotateWriteFile();
 	            	}
-	            	
+
 	                nextWriteBatch = new WriteBatch(file, file.getLength(), write);
 	                enqueueMutex.notifyAll();
 	                break;
@@ -249,7 +220,7 @@ class DataFileAppender {
 	            }
             }
             if (!write.sync) {
-                inflightWrites.put(new WriteKey(write.location), write);
+                inflightWrites.put(new Journal.WriteKey(write.location), write);
             }
             return nextWriteBatch;
         }
@@ -293,7 +264,6 @@ class DataFileAppender {
         WriteBatch wb = null;
         try {
 
-            DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
             while (true) {
 
                 Object o = null;
@@ -327,24 +297,8 @@ class DataFileAppender {
                     }
                 }
 
-                WriteCommand write = wb.writes.getHead();
-
-                // Write an empty batch control record.
-                buff.reset();
-                buff.writeInt(Journal.BATCH_CONTROL_RECORD_SIZE);
-                buff.writeByte(Journal.BATCH_CONTROL_RECORD_TYPE);
-                buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC);
-                buff.writeInt(0);
-                buff.writeLong(0);
-                
-                boolean forceToDisk = false;
-                while (write != null) {
-                    forceToDisk |= write.sync | write.onComplete != null;
-                    buff.writeInt(write.location.getSize());
-                    buff.writeByte(write.location.getType());
-                    buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
-                    write = write.getNext();
-                }
+                final DataByteArrayOutputStream buff = wb.buff;
+                final boolean forceToDisk = wb.forceToDisk;
 
                 ByteSequence sequence = buff.toByteSequence();
                 
@@ -372,7 +326,6 @@ class DataFileAppender {
                     }
                 }
                 file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
-                
                 ReplicationTarget replicationTarget = journal.getReplicationTarget();
                 if( replicationTarget!=null ) {
                 	replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
@@ -382,16 +335,16 @@ class DataFileAppender {
                     file.getFD().sync();
                 }
 
-                WriteCommand lastWrite = wb.writes.getTail();
+                Journal.WriteCommand lastWrite = wb.writes.getTail();
                 journal.setLastAppendLocation(lastWrite.location);
 
                 // Now that the data is on disk, remove the writes from the in
                 // flight
                 // cache.
-                write = wb.writes.getHead();
+                Journal.WriteCommand write = wb.writes.getHead();
                 while (write != null) {
                     if (!write.sync) {
-                        inflightWrites.remove(new WriteKey(write.location));
+                        inflightWrites.remove(new Journal.WriteKey(write.location));
                     }
                     if (write.onComplete != null) {
                         try {
@@ -431,4 +384,10 @@ class DataFileAppender {
         }
     }
 
+    private boolean appendToBuffer(Journal.WriteCommand write, DataByteArrayOutputStream
buff) throws IOException {
+        buff.writeInt(write.location.getSize());
+        buff.writeByte(write.location.getType());
+        buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
+        return write.sync | write.onComplete != null;
+    }
 }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java?rev=1222471&r1=1222470&r2=1222471&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java Thu Dec 22
21:44:01 2011
@@ -33,7 +33,7 @@ public class DataFile extends LinkedNode
 
     protected final File file;
     protected final Integer dataFileId;
-    protected int length;
+    protected volatile int length;
     protected final SequenceSet corruptedBlocks = new SequenceSet();
 
     DataFile(File file, int number, int preferedSize) {

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java?rev=1222471&r1=1222470&r2=1222471&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java Thu
Dec 22 21:44:01 2011
@@ -20,8 +20,6 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.Map;
 
-import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
-import org.apache.kahadb.journal.DataFileAppender.WriteKey;
 import org.apache.kahadb.util.ByteSequence;
 
 /**
@@ -33,7 +31,7 @@ import org.apache.kahadb.util.ByteSequen
 final class DataFileAccessor {
 
     private final DataFile dataFile;
-    private final Map<WriteKey, WriteCommand> inflightWrites;
+    private final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
     private final RandomAccessFile file;
     private boolean disposed;
 
@@ -71,7 +69,7 @@ final class DataFileAccessor {
             throw new IOException("Invalid location: " + location);
         }
 
-        WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+        Journal.WriteCommand asyncWrite = (Journal.WriteCommand)inflightWrites.get(new Journal.WriteKey(location));
         if (asyncWrite != null) {
             return asyncWrite.data;
         }
@@ -106,7 +104,7 @@ final class DataFileAccessor {
     }
 
     public void readLocationDetails(Location location) throws IOException {
-        WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+        Journal.WriteCommand asyncWrite = (Journal.WriteCommand)inflightWrites.get(new Journal.WriteKey(location));
         if (asyncWrite != null) {
             location.setSize(asyncWrite.location.getSize());
             location.setType(asyncWrite.location.getType());

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=1222471&r1=1222470&r2=1222471&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Thu
Dec 22 21:44:01 2011
@@ -27,7 +27,6 @@ import java.util.zip.Checksum;
 
 import org.apache.kahadb.util.ByteSequence;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.LinkedNode;
 import org.apache.kahadb.util.LinkedNodeList;
 
 /**
@@ -37,10 +36,10 @@ import org.apache.kahadb.util.LinkedNode
  * 
  * 
  */
-class DataFileAppender {
+class DataFileAppender implements FileAppender {
 
     protected final Journal journal;
-    protected final Map<WriteKey, WriteCommand> inflightWrites;
+    protected final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
     protected final Object enqueueMutex = new Object() {
     };
     protected WriteBatch nextWriteBatch;
@@ -82,13 +81,13 @@ class DataFileAppender {
 
         public final DataFile dataFile;
 
-        public final LinkedNodeList<WriteCommand> writes = new LinkedNodeList<WriteCommand>();
+        public final LinkedNodeList<Journal.WriteCommand> writes = new LinkedNodeList<Journal.WriteCommand>();
         public final CountDownLatch latch = new CountDownLatch(1);
 		private final int offset;
         public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
         public AtomicReference<IOException> exception = new AtomicReference<IOException>();
 
-        public WriteBatch(DataFile dataFile, int offset, WriteCommand write) throws IOException
{
+        public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws
IOException {
             this.dataFile = dataFile;
 			this.offset = offset;
             this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
@@ -97,7 +96,7 @@ class DataFileAppender {
             append(write);
         }
 
-        public boolean canAppend(WriteCommand write) {
+        public boolean canAppend(Journal.WriteCommand write) {
             int newSize = size + write.location.getSize();
 			if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength()
) {
                 return false;
@@ -105,7 +104,7 @@ class DataFileAppender {
             return true;
         }
 
-        public void append(WriteCommand write) throws IOException {
+        public void append(Journal.WriteCommand write) throws IOException {
             this.writes.addLast(write);
             write.location.setDataFileId(dataFile.getDataFileId());
             write.location.setOffset(offset+size);
@@ -116,27 +115,6 @@ class DataFileAppender {
         }
     }
 
-    public static class WriteCommand extends LinkedNode<WriteCommand> {
-        public final Location location;
-        public final ByteSequence data;
-        final boolean sync;
-        public final Runnable onComplete;
-
-        public WriteCommand(Location location, ByteSequence data, boolean sync) {
-            this.location = location;
-            this.data = data;
-            this.sync = sync;
-            this.onComplete = null;
-        }
-
-        public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
-            this.location = location;
-            this.data = data;
-            this.onComplete = onComplete;
-            this.sync = false;
-        }
-    }
-
     /**
      * Construct a Store writer
      */
@@ -155,7 +133,7 @@ class DataFileAppender {
         location.setSize(size);
         location.setType(type);
 
-        WriteCommand write = new WriteCommand(location, data, sync);
+        Journal.WriteCommand write = new Journal.WriteCommand(location, data, sync);
 
         WriteBatch batch = enqueue(write);
         location.setLatch(batch.latch);
@@ -182,7 +160,7 @@ class DataFileAppender {
         location.setSize(size);
         location.setType(type);
 
-        WriteCommand write = new WriteCommand(location, data, onComplete);
+        Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete);
 
         WriteBatch batch = enqueue(write);
  
@@ -190,7 +168,7 @@ class DataFileAppender {
         return location;
     }
 
-    private WriteBatch enqueue(WriteCommand write) throws IOException {
+    private WriteBatch enqueue(Journal.WriteCommand write) throws IOException {
         synchronized (enqueueMutex) {
             if (shutdown) {
                 throw new IOException("Async Writter Thread Shutdown");
@@ -220,7 +198,7 @@ class DataFileAppender {
 	            	if( file.getLength() > journal.getMaxFileLength() ) {
 	            		file = journal.rotateWriteFile();
 	            	}
-	            	
+
 	                nextWriteBatch = new WriteBatch(file, file.getLength(), write);
 	                enqueueMutex.notifyAll();
 	                break;
@@ -249,7 +227,7 @@ class DataFileAppender {
 	            }
             }
             if (!write.sync) {
-                inflightWrites.put(new WriteKey(write.location), write);
+                inflightWrites.put(new Journal.WriteKey(write.location), write);
             }
             return nextWriteBatch;
         }
@@ -327,7 +305,7 @@ class DataFileAppender {
                     }
                 }
 
-                WriteCommand write = wb.writes.getHead();
+                Journal.WriteCommand write = wb.writes.getHead();
 
                 // Write an empty batch control record.
                 buff.reset();
@@ -336,7 +314,7 @@ class DataFileAppender {
                 buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC);
                 buff.writeInt(0);
                 buff.writeLong(0);
-                
+
                 boolean forceToDisk = false;
                 while (write != null) {
                     forceToDisk |= write.sync | write.onComplete != null;
@@ -382,7 +360,7 @@ class DataFileAppender {
                     file.getFD().sync();
                 }
 
-                WriteCommand lastWrite = wb.writes.getTail();
+                Journal.WriteCommand lastWrite = wb.writes.getTail();
                 journal.setLastAppendLocation(lastWrite.location);
 
                 // Now that the data is on disk, remove the writes from the in
@@ -391,7 +369,7 @@ class DataFileAppender {
                 write = wb.writes.getHead();
                 while (write != null) {
                     if (!write.sync) {
-                        inflightWrites.remove(new WriteKey(write.location));
+                        inflightWrites.remove(new Journal.WriteKey(write.location));
                     }
                     if (write.onComplete != null) {
                         try {

Added: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java?rev=1222471&view=auto
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java (added)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java Thu Dec
22 21:44:01 2011
@@ -0,0 +1,15 @@
+package org.apache.kahadb.journal;
+
+import java.io.IOException;
+import org.apache.kahadb.util.ByteSequence;
+
+/**
+ * User: gtully
+ */
+public interface FileAppender {
+    Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException;
+
+    Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException;
+
+    void close() throws IOException;
+}

Propchange: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=1222471&r1=1222470&r2=1222471&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Thu Dec 22
21:44:01 2011
@@ -36,10 +36,9 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
+import org.apache.kahadb.util.LinkedNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
-import org.apache.kahadb.journal.DataFileAppender.WriteKey;
 import org.apache.kahadb.util.ByteSequence;
 import org.apache.kahadb.util.DataByteArrayInputStream;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
@@ -53,6 +52,8 @@ import org.apache.kahadb.util.Sequence;
  * 
  */
 public class Journal {
+    public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
+    public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER,
"false"));
 
     private static final int MAX_BATCH_SIZE = 32*1024*1024;
 
@@ -103,7 +104,7 @@ public class Journal {
     protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
     protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
     
-    protected DataFileAppender appender;
+    protected FileAppender appender;
     protected DataFileAccessorPool accessorPool;
 
     protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
@@ -130,7 +131,7 @@ public class Journal {
         started = true;
         preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF);
 
-        appender = new DataFileAppender(this);
+        appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new
DataFileAppender(this);
 
         File[] files = directory.listFiles(new FilenameFilter() {
             public boolean accept(File dir, String n) {
@@ -751,4 +752,50 @@ public class Journal {
     public void setSizeAccumulator(AtomicLong storeSizeAccumulator) {
        this.totalLength = storeSizeAccumulator;
     }
+
+    public static class WriteCommand extends LinkedNode<WriteCommand> {
+        public final Location location;
+        public final ByteSequence data;
+        final boolean sync;
+        public final Runnable onComplete;
+
+        public WriteCommand(Location location, ByteSequence data, boolean sync) {
+            this.location = location;
+            this.data = data;
+            this.sync = sync;
+            this.onComplete = null;
+        }
+
+        public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
+            this.location = location;
+            this.data = data;
+            this.onComplete = onComplete;
+            this.sync = false;
+        }
+    }
+
+    public static class WriteKey {
+        private final int file;
+        private final long offset;
+        private final int hash;
+
+        public WriteKey(Location item) {
+            file = item.getDataFileId();
+            offset = item.getOffset();
+            // TODO: see if we can build a better hash
+            hash = (int)(file ^ offset);
+        }
+
+        public int hashCode() {
+            return hash;
+        }
+
+        public boolean equals(Object obj) {
+            if (obj instanceof WriteKey) {
+                WriteKey di = (WriteKey)obj;
+                return di.file == file && di.offset == offset;
+            }
+            return false;
+        }
+    }
 }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=1222471&r1=1222470&r2=1222471&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Thu Dec 22 21:44:01
2011
@@ -1039,7 +1039,9 @@ public class PageFile {
         }
 
         Checksum checksum = new Adler32();
-        recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+        if (enableRecoveryFile) {
+            recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+        }
         for (PageWrite w : batch) {
             if (enableRecoveryFile) {
                 try {
@@ -1078,7 +1080,9 @@ public class PageFile {
 
             if (enableDiskSyncs) {
                 // Sync to make sure recovery buffer writes land on disk..
-                recoveryFile.getFD().sync();
+                if (enableRecoveryFile) {
+                    recoveryFile.getFD().sync();
+                }
                 writeFile.getFD().sync();
             }
         } finally {



Mime
View raw message