activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1222705 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/broker/jmx/ activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq-core/src/test/java/org/apache/activemq/store/kahadb/ kahadb/src/main/java/org/...
Date Fri, 23 Dec 2011 15:44:52 GMT
Author: gtully
Date: Fri Dec 23 15:44:52 2011
New Revision: 1222705

URL: http://svn.apache.org/viewvc?rev=1222705&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3646 - Allow KahaDB to run without disk syncs, higher
through put without the jms persistence guarantee. Allow 0 peroid to disable checkpoint/cleanup.
Allow jmx gc operation to invoke cleanup so that store gc can be initiated via jms to ensure
disk is reclaimed. Ensure periodic checkpoint does not sync when enableJournalDiskSyncs=false,
it waits for completion but does not force to disk. fix cached buffer allocation and refactor
to reuse more code in CallerBufferingDataFileAppender

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=1222705&r1=1222704&r2=1222705&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
Fri Dec 23 15:44:52 2011
@@ -38,12 +38,14 @@ import org.apache.activemq.command.Consu
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.util.BrokerSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  *
  */
 public class BrokerView implements BrokerViewMBean {
-
+    private static final Logger LOG = LoggerFactory.getLogger(BrokerView.class);
     ManagedRegionBroker broker;
     private final BrokerService brokerService;
     private final AtomicInteger sessionIdCounter = new AtomicInteger(0);
@@ -76,6 +78,11 @@ public class BrokerView implements Broke
 
     public void gc() throws Exception {
         brokerService.getBroker().gc();
+        try {
+            brokerService.getPersistenceAdapter().checkpoint(true);
+        } catch (IOException e) {
+            LOG.error("Failed to checkpoint persistence adapter on gc request, reason:" +
e, e);
+        }
     }
 
     public void start() throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1222705&r1=1222704&r2=1222705&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Fri Dec 23 15:44:52 2011
@@ -980,7 +980,7 @@ public class KahaDBStore extends Message
     }
 
     public void checkpoint(boolean sync) throws IOException {
-        super.checkpointCleanup(false);
+        super.checkpointCleanup(sync);
     }
 
     // /////////////////////////////////////////////////////////////////

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1222705&r1=1222704&r2=1222705&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Fri Dec 23 15:44:52 2011
@@ -67,7 +67,6 @@ public abstract class MessageDatabase ex
     static final int CLOSED_STATE = 1;
     static final int OPEN_STATE = 2;
     static final long NOT_ACKED = -1;
-    static final long UNMATCHED_SEQ = -2;
 
     static final int VERSION = 4;
 
@@ -247,6 +246,10 @@ public abstract class MessageDatabase ex
     }
 
     private void startCheckpoint() {
+        if (checkpointInterval == 0 &&  cleanupInterval == 0) {
+            LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
+            return;
+        }
         synchronized (checkpointThreadLock) {
             boolean start = false;
             if (checkpointThread == null) {
@@ -264,15 +267,15 @@ public abstract class MessageDatabase ex
                             long lastCheckpoint = System.currentTimeMillis();
                             // Sleep for a short time so we can periodically check
                             // to see if we need to exit this thread.
-                            long sleepTime = Math.min(checkpointInterval, 500);
+                            long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval
: cleanupInterval, 500);
                             while (opened.get()) {
                                 Thread.sleep(sleepTime);
                                 long now = System.currentTimeMillis();
-                                if( now - lastCleanup >= cleanupInterval ) {
+                                if( cleanupInterval > 0 && (now - lastCleanup
>= cleanupInterval) ) {
                                     checkpointCleanup(true);
                                     lastCleanup = now;
                                     lastCheckpoint = now;
-                                } else if( now - lastCheckpoint >= checkpointInterval
) {
+                                } else if( checkpointInterval > 0 && (now - lastCheckpoint
>= checkpointInterval )) {
                                     checkpointCleanup(false);
                                     lastCheckpoint = now;
                                 }
@@ -392,7 +395,9 @@ public abstract class MessageDatabase ex
                 }
                 journal.close();
                 synchronized (checkpointThreadLock) {
-                    checkpointThread.join();
+                    if (checkpointThread != null) {
+                        checkpointThread.join();
+                    }
                 }
             } finally {
                 lockFile.unlock();
@@ -781,6 +786,14 @@ public abstract class MessageDatabase ex
         }
     }
 
+    public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
+        int size = data.serializedSizeFramed();
+        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
+        os.writeByte(data.type().getNumber());
+        data.writeFramed(os);
+        return os.toByteSequence();
+    }
+
     // /////////////////////////////////////////////////////////////////
     // Methods call by the broker to update and query the store.
     // /////////////////////////////////////////////////////////////////
@@ -788,12 +801,12 @@ public abstract class MessageDatabase ex
         return store(data, false, null,null);
     }
 
-    public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
-        int size = data.serializedSizeFramed();
-        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
-        os.writeByte(data.type().getNumber());
-        data.writeFramed(os);
-        return os.toByteSequence();
+    public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete)
throws IOException {
+        return store(data, false, null,null, onJournalStoreComplete);
+    }
+
+    public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable
after) throws IOException {
+        return store(data, sync, before, after, null);
     }
 
     /**
@@ -802,14 +815,14 @@ public abstract class MessageDatabase ex
      * the JournalMessage is used to update the index just like it would be done
      * during a recovery process.
      */
-    public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable
after) throws IOException {
+    public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable
after, Runnable onJournalStoreComplete) throws IOException {
         if (before != null) {
             before.run();
         }
         try {
             ByteSequence sequence = toByteSequence(data);
             long start = System.currentTimeMillis();
-            Location location = journal.write(sequence, sync);
+            Location location = onJournalStoreComplete == null ? journal.write(sequence,
sync) :  journal.write(sequence, onJournalStoreComplete) ;
             long start2 = System.currentTimeMillis();
             process(data, location, after);
             long end = System.currentTimeMillis();
@@ -1408,13 +1421,25 @@ public abstract class MessageDatabase ex
         LOG.debug("Checkpoint done.");
     }
 
+    final Runnable nullCompletionCallback = new Runnable() {
+        @Override
+        public void run() {
+        }
+    };
     private Location checkpointProducerAudit() throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         ObjectOutputStream oout = new ObjectOutputStream(baos);
         oout.writeObject(metadata.producerSequenceIdTracker);
         oout.flush();
         oout.close();
-        return store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())),
true, null, null);
+        // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs
= false
+        Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())),
nullCompletionCallback);
+        try {
+            location.getLatch().await();
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException(e.toString());
+        }
+        return location;
     }
 
     public HashSet<Integer> getJournalFilesBeingReplicated() {
@@ -2076,6 +2101,7 @@ public abstract class MessageDatabase ex
         manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
         manager.setArchiveDataLogs(isArchiveDataLogs());
         manager.setSizeAccumulator(storeSize);
+        manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
         if (getDirectoryArchive() != null) {
             IOHelper.mkdirs(getDirectoryArchive());
             manager.setDirectoryArchive(getDirectoryArchive());

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java?rev=1222705&r1=1222704&r2=1222705&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
Fri Dec 23 15:44:52 2011
@@ -33,7 +33,11 @@ import org.apache.activemq.ActiveMQConne
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ConnectionControl;
+import org.apache.activemq.kaha.impl.async.DataFileAppenderTest;
+import org.apache.kahadb.journal.FileAppender;
+import org.apache.kahadb.journal.Journal;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -52,14 +56,14 @@ public class KahaDBFastEnqueueTest {
     private boolean useBytesMessage= true;
     private final int parallelProducer = 20;
     private Vector<Exception> exceptions = new Vector<Exception>();
-    final long toSend = 500000;
+    long toSend = 10000;
 
-    @Ignore("too slow, exploring getting broker disk bound")
     // use with:
     // -Xmx4g -Dorg.apache.kahadb.journal.appender.WRITE_STAT_WINDOW=10000 -Dorg.apache.kahadb.journal.CALLER_BUFFER_APPENDER=true
+    @Test
     public void testPublishNoConsumer() throws Exception {
 
-        startBroker(true);
+        startBroker(true, 10);
 
         final AtomicLong sharedCount = new AtomicLong(toSend);
         long start = System.currentTimeMillis();
@@ -82,19 +86,57 @@ public class KahaDBFastEnqueueTest {
         assertTrue("No exceptions: " + exceptions, exceptions.isEmpty());
         long totalSent  = toSend * payloadString.length();
 
-        //System.out.println("Pre shutdown: Index totalWritten:       " + kahaDBPersistenceAdapter.getStore().getPageFile().totalWritten);
+        double duration =  System.currentTimeMillis() - start;
+        stopBroker();
+        LOG.info("Duration:                " + duration + "ms");
+        LOG.info("Rate:                       " + (toSend * 1000/duration) + "m/s");
+        LOG.info("Total send:             " + totalSent);
+        LOG.info("Total journal write: " + kahaDBPersistenceAdapter.getStore().getJournal().length());
+        LOG.info("Journal writes %:    " + kahaDBPersistenceAdapter.getStore().getJournal().length()
/ (double)totalSent * 100 + "%");
+
+        restartBroker(0, 1200000);
+        consumeMessages(toSend);
+    }
+
+    @Test
+    public void testPublishNoConsumerNoCheckpoint() throws Exception {
+
+        toSend = 100;
+        startBroker(true, 0);
+
+        final AtomicLong sharedCount = new AtomicLong(toSend);
+        long start = System.currentTimeMillis();
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        for (int i=0; i< parallelProducer; i++) {
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        publishMessages(sharedCount, 0);
+                    } catch (Exception e) {
+                        exceptions.add(e);
+                    }
+                }
+            });
+        }
+        executorService.shutdown();
+        executorService.awaitTermination(30, TimeUnit.MINUTES);
+        assertTrue("Producers done in time", executorService.isTerminated());
+        assertTrue("No exceptions: " + exceptions, exceptions.isEmpty());
+        long totalSent  = toSend * payloadString.length();
+
+        broker.getAdminView().gc();
+
 
         double duration =  System.currentTimeMillis() - start;
         stopBroker();
-        System.out.println("Duration:                " + duration + "ms");
-        System.out.println("Rate:                       " + (toSend * 1000/duration) + "m/s");
-        System.out.println("Total send:             " + totalSent);
-        System.out.println("Total journal write: " + kahaDBPersistenceAdapter.getStore().getJournal().length());
-        //System.out.println("Total index write:   " + kahaDBPersistenceAdapter.getStore().getPageFile().totalWritten);
-        System.out.println("Journal writes %:    " + kahaDBPersistenceAdapter.getStore().getJournal().length()
/ (double)totalSent * 100 + "%");
-        //System.out.println("Index writes %:       " + kahaDBPersistenceAdapter.getStore().getPageFile().totalWritten
/ (double)totalSent * 100 + "%");
+        LOG.info("Duration:                " + duration + "ms");
+        LOG.info("Rate:                       " + (toSend * 1000/duration) + "m/s");
+        LOG.info("Total send:             " + totalSent);
+        LOG.info("Total journal write: " + kahaDBPersistenceAdapter.getStore().getJournal().length());
+        LOG.info("Journal writes %:    " + kahaDBPersistenceAdapter.getStore().getJournal().length()
/ (double)totalSent * 100 + "%");
 
-        restartBroker(0);
+        restartBroker(0, 0);
         consumeMessages(toSend);
     }
 
@@ -110,10 +152,16 @@ public class KahaDBFastEnqueueTest {
         assertNull("none left over", consumer.receive(2000));
     }
 
-    private void restartBroker(int restartDelay) throws Exception {
+    private void restartBroker(int restartDelay, int checkpoint) throws Exception {
         stopBroker();
         TimeUnit.MILLISECONDS.sleep(restartDelay);
-        startBroker(false);
+        startBroker(false, checkpoint);
+    }
+
+    @Before
+    public void setProps() {
+        System.setProperty(Journal.CALLER_BUFFER_APPENDER, Boolean.toString(true));
+        System.setProperty(FileAppender.PROPERTY_LOG_WRITE_STAT_WINDOW, "10000");
     }
 
     @After
@@ -122,6 +170,8 @@ public class KahaDBFastEnqueueTest {
             broker.stop();
             broker.waitUntilStopped();
         }
+        System.clearProperty(Journal.CALLER_BUFFER_APPENDER);
+        System.clearProperty(FileAppender.PROPERTY_LOG_WRITE_STAT_WINDOW);
     }
 
     final double sampleRate = 100000;
@@ -153,14 +203,14 @@ public class KahaDBFastEnqueueTest {
         connection.close();
     }
 
-    public void startBroker(boolean deleteAllMessages) throws Exception {
+    public void startBroker(boolean deleteAllMessages, int checkPointPeriod) throws Exception
{
         broker = new BrokerService();
         broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
         kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
         kahaDBPersistenceAdapter.setEnableJournalDiskSyncs(false);
         // defer checkpoints which require a sync
-        kahaDBPersistenceAdapter.setCleanupInterval(20 * 60 * 1000);
-        kahaDBPersistenceAdapter.setCheckpointInterval(20 * 60 * 1000);
+        kahaDBPersistenceAdapter.setCleanupInterval(checkPointPeriod);
+        kahaDBPersistenceAdapter.setCheckpointInterval(checkPointPeriod);
 
         // optimise for disk best batch rate
         kahaDBPersistenceAdapter.setJournalMaxWriteBatchSize(24*1024*1024); //4mb default
@@ -171,7 +221,6 @@ public class KahaDBFastEnqueueTest {
         kahaDBPersistenceAdapter.setEnableIndexRecoveryFile(false);
         kahaDBPersistenceAdapter.setEnableIndexDiskSyncs(false);
 
-        broker.setUseJmx(false);
         broker.addConnector("tcp://0.0.0.0:0");
         broker.start();
 
@@ -179,6 +228,7 @@ public class KahaDBFastEnqueueTest {
         connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()
+ options);
     }
 
+    @Test
     public void testRollover() throws Exception {
         byte flip = 0x1;
         for (long i=0; i<Short.MAX_VALUE; i++) {

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java?rev=1222705&r1=1222704&r2=1222705&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java
Fri Dec 23 15:44:52 2011
@@ -17,17 +17,11 @@
 package org.apache.kahadb.journal;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.io.RandomAccessFile;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 import org.apache.kahadb.util.ByteSequence;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.LinkedNodeList;
 
 /**
  * An optimized writer to do batch appends to a data file. This object is thread
@@ -37,69 +31,34 @@ import org.apache.kahadb.util.LinkedNode
  * reduces the round trip of the write thread.
  * 
  */
-class CallerBufferingDataFileAppender implements FileAppender {
-
-    protected final Journal journal;
-    protected final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
-    protected final Object enqueueMutex = new Object() {
-    };
-    protected WriteBatch nextWriteBatch;
-
-    protected boolean shutdown;
-    protected IOException firstAsyncException;
-    protected final CountDownLatch shutdownDone = new CountDownLatch(1);
-    protected int maxWriteBatchSize;
-
-    private boolean running;
-    private Thread thread;
+class CallerBufferingDataFileAppender extends DataFileAppender {
 
     final DataByteArrayOutputStream cachedBuffers[] = new DataByteArrayOutputStream[] {
             new DataByteArrayOutputStream(maxWriteBatchSize),
             new DataByteArrayOutputStream(maxWriteBatchSize)
     };
     volatile byte flip = 0x1;
-    public class WriteBatch {
+    public class WriteBatch extends DataFileAppender.WriteBatch {
 
         DataByteArrayOutputStream buff = cachedBuffers[flip ^= 1];
-        public final DataFile dataFile;
-
-        public final LinkedNodeList<Journal.WriteCommand> writes = new LinkedNodeList<Journal.WriteCommand>();
-        public final CountDownLatch latch = new CountDownLatch(1);
-		private final int offset;
-        public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
-        public AtomicReference<IOException> exception = new AtomicReference<IOException>();
-        public boolean forceToDisk;
-
+        private boolean forceToDisk;
         public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws
IOException {
-            this.dataFile = dataFile;
-			this.offset = offset;
-            this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
-            this.size=Journal.BATCH_CONTROL_RECORD_SIZE;
-            journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
+            super(dataFile, offset);
             initBuffer(buff);
             append(write);
         }
 
-        public boolean canAppend(Journal.WriteCommand write) {
-            int newSize = size + write.location.getSize();
-			if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength()
) {
-                return false;
-            }
-            return true;
-        }
-
         public void append(Journal.WriteCommand write) throws IOException {
-            this.writes.addLast(write);
-            write.location.setDataFileId(dataFile.getDataFileId());
-            write.location.setOffset(offset+size);
-            int s = write.location.getSize();
-			size += s;
-            dataFile.incrementLength(s);
-            journal.addToTotalLength(s);
+            super.append(write);
             forceToDisk |= appendToBuffer(write, buff);
         }
     }
 
+    @Override
+    protected DataFileAppender.WriteBatch newWriteBatch(Journal.WriteCommand write, DataFile
file) throws IOException {
+        return new WriteBatch(file, file.getLength(), write);
+    }
+
     private void initBuffer(DataByteArrayOutputStream buff) throws IOException {
         // Write an empty batch control record.
         buff.reset();
@@ -108,148 +67,10 @@ class CallerBufferingDataFileAppender im
         buff.writeLong(0);
     }
 
-    /**
-     * Construct a Store writer
-     */
     public CallerBufferingDataFileAppender(Journal dataManager) {
-        this.journal = dataManager;
-        this.inflightWrites = this.journal.getInflightWrites();
-        this.maxWriteBatchSize = this.journal.getWriteBatchSize();
-    }
-
-    public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException
{
-    	
-        // Write the packet our internal buffer.
-        int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
-
-        final Location location = new Location();
-        location.setSize(size);
-        location.setType(type);
-
-        Journal.WriteCommand write = new Journal.WriteCommand(location, data, sync);
-
-        WriteBatch batch = enqueue(write);
-        location.setLatch(batch.latch);
-        if (sync) {
-            try {
-                batch.latch.await();
-            } catch (InterruptedException e) {
-                throw new InterruptedIOException();
-            }
-            IOException exception = batch.exception.get(); 
-            if (exception != null) {
-            	throw exception;
-            }
-        }	
-
-        return location;
-    }
-
-    public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException
{
-        // Write the packet our internal buffer.
-        int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
-
-        final Location location = new Location();
-        location.setSize(size);
-        location.setType(type);
-
-        Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete);
-
-        WriteBatch batch = enqueue(write);
- 
-        location.setLatch(batch.latch);
-        return location;
-    }
-
-    private WriteBatch enqueue(Journal.WriteCommand write) throws IOException {
-        synchronized (enqueueMutex) {
-            if (shutdown) {
-                throw new IOException("Async Writter Thread Shutdown");
-            }
-            
-            if (!running) {
-                running = true;
-                thread = new Thread() {
-                    public void run() {
-                        processQueue();
-                    }
-                };
-                thread.setPriority(Thread.MAX_PRIORITY);
-                thread.setDaemon(true);
-                thread.setName("ActiveMQ Data File Writer");
-                thread.start();
-                firstAsyncException = null;
-            }
-            
-            if (firstAsyncException != null) {
-                throw firstAsyncException;
-            }
-
-            while ( true ) {
-	            if (nextWriteBatch == null) {
-	            	DataFile file = journal.getCurrentWriteFile();
-	            	if( file.getLength() > journal.getMaxFileLength() ) {
-	            		file = journal.rotateWriteFile();
-	            	}
-
-	                nextWriteBatch = new WriteBatch(file, file.getLength(), write);
-	                enqueueMutex.notifyAll();
-	                break;
-	            } else {
-	                // Append to current batch if possible..
-	                if (nextWriteBatch.canAppend(write)) {
-	                    nextWriteBatch.append(write);
-	                    break;
-	                } else {
-	                    // Otherwise wait for the queuedCommand to be null
-	                    try {
-	                        while (nextWriteBatch != null) {
-	                            final long start = System.currentTimeMillis();
-	                            enqueueMutex.wait();
-	                            if (maxStat > 0) { 
-	                                System.err.println("Watiting for write to finish with full
batch... millis: " + (System.currentTimeMillis() - start));
-	                            }
-	                        }
-	                    } catch (InterruptedException e) {
-	                        throw new InterruptedIOException();
-	                    }
-	                    if (shutdown) {
-	                        throw new IOException("Async Writter Thread Shutdown");
-	                    }
-	                }
-	            }
-            }
-            if (!write.sync) {
-                inflightWrites.put(new Journal.WriteKey(write.location), write);
-            }
-            return nextWriteBatch;
-        }
+        super(dataManager);
     }
 
-    public void close() throws IOException {
-        synchronized (enqueueMutex) {
-            if (!shutdown) {
-                shutdown = true;
-                if (running) {
-                    enqueueMutex.notifyAll();
-                } else {
-                    shutdownDone.countDown();
-                }
-            }
-        }
-
-        try {
-            shutdownDone.await();
-        } catch (InterruptedException e) {
-            throw new InterruptedIOException();
-        }
-
-    }
-
-    public static final String PROPERTY_LOG_WRITE_STAT_WINDOW = "org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW";
-    public static final int maxStat = Integer.parseInt(System.getProperty(PROPERTY_LOG_WRITE_STAT_WINDOW,
"0"));
-    int statIdx = 0;
-    int[] stats = new int[maxStat];
     /**
      * The async processing loop that writes to the data files and does the
      * force calls. Since the file sync() call is the slowest of all the
@@ -258,6 +79,7 @@ class CallerBufferingDataFileAppender im
      * accomplished attaching the same CountDownLatch instance to every force
      * request in a group.
      */
+    @Override
     protected void processQueue() {
         DataFile dataFile = null;
         RandomAccessFile file = null;
@@ -337,27 +159,9 @@ class CallerBufferingDataFileAppender im
 
                 Journal.WriteCommand lastWrite = wb.writes.getTail();
                 journal.setLastAppendLocation(lastWrite.location);
+                signalDone(wb);
 
-                // Now that the data is on disk, remove the writes from the in
-                // flight
-                // cache.
-                Journal.WriteCommand write = wb.writes.getHead();
-                while (write != null) {
-                    if (!write.sync) {
-                        inflightWrites.remove(new Journal.WriteKey(write.location));
-                    }
-                    if (write.onComplete != null) {
-                        try {
-                            write.onComplete.run();
-                        } catch (Throwable e) {
-                            e.printStackTrace();
-                        }
-                    }
-                    write = write.getNext();
-                }
 
-                // Signal any waiting threads that the write is on disk.
-                wb.latch.countDown();
             }
         } catch (IOException e) {
             synchronized (enqueueMutex) {
@@ -388,6 +192,6 @@ class CallerBufferingDataFileAppender im
         buff.writeInt(write.location.getSize());
         buff.writeByte(write.location.getType());
         buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
-        return write.sync | write.onComplete != null;
+        return write.sync | (syncOnComplete && write.onComplete != null);
     }
 }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=1222705&r1=1222704&r2=1222705&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Fri
Dec 23 15:44:52 2011
@@ -48,8 +48,9 @@ class DataFileAppender implements FileAp
     protected IOException firstAsyncException;
     protected final CountDownLatch shutdownDone = new CountDownLatch(1);
     protected int maxWriteBatchSize;
+    protected final boolean syncOnComplete;
 
-    private boolean running;
+    protected boolean running;
     private Thread thread;
 
     public static class WriteKey {
@@ -83,16 +84,20 @@ class DataFileAppender implements FileAp
 
         public final LinkedNodeList<Journal.WriteCommand> writes = new LinkedNodeList<Journal.WriteCommand>();
         public final CountDownLatch latch = new CountDownLatch(1);
-		private final int offset;
+        protected final int offset;
         public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
         public AtomicReference<IOException> exception = new AtomicReference<IOException>();
 
-        public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws
IOException {
+        public WriteBatch(DataFile dataFile,int offset) {
             this.dataFile = dataFile;
 			this.offset = offset;
             this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
             this.size=Journal.BATCH_CONTROL_RECORD_SIZE;
             journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
+        }
+
+        public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws
IOException {
+            this(dataFile, offset);
             append(write);
         }
 
@@ -122,6 +127,7 @@ class DataFileAppender implements FileAp
         this.journal = dataManager;
         this.inflightWrites = this.journal.getInflightWrites();
         this.maxWriteBatchSize = this.journal.getWriteBatchSize();
+        this.syncOnComplete = this.journal.isEnableAsyncDiskSync();
     }
 
     public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException
{
@@ -199,7 +205,7 @@ class DataFileAppender implements FileAp
 	            		file = journal.rotateWriteFile();
 	            	}
 
-	                nextWriteBatch = new WriteBatch(file, file.getLength(), write);
+	                nextWriteBatch = newWriteBatch(write, file);
 	                enqueueMutex.notifyAll();
 	                break;
 	            } else {
@@ -233,6 +239,10 @@ class DataFileAppender implements FileAp
         }
     }
 
+    protected WriteBatch newWriteBatch(Journal.WriteCommand write, DataFile file) throws
IOException {
+        return new WriteBatch(file, file.getLength(), write);
+    }
+
     public void close() throws IOException {
         synchronized (enqueueMutex) {
             if (!shutdown) {
@@ -253,8 +263,6 @@ class DataFileAppender implements FileAp
 
     }
 
-    public static final String PROPERTY_LOG_WRITE_STAT_WINDOW = "org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW";
-    public static final int maxStat = Integer.parseInt(System.getProperty(PROPERTY_LOG_WRITE_STAT_WINDOW,
"0"));
     int statIdx = 0;
     int[] stats = new int[maxStat];
     /**
@@ -317,7 +325,7 @@ class DataFileAppender implements FileAp
 
                 boolean forceToDisk = false;
                 while (write != null) {
-                    forceToDisk |= write.sync | write.onComplete != null;
+                    forceToDisk |= write.sync | (syncOnComplete && write.onComplete
!= null);
                     buff.writeInt(write.location.getSize());
                     buff.writeByte(write.location.getType());
                     buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
@@ -363,26 +371,7 @@ class DataFileAppender implements FileAp
                 Journal.WriteCommand lastWrite = wb.writes.getTail();
                 journal.setLastAppendLocation(lastWrite.location);
 
-                // Now that the data is on disk, remove the writes from the in
-                // flight
-                // cache.
-                write = wb.writes.getHead();
-                while (write != null) {
-                    if (!write.sync) {
-                        inflightWrites.remove(new Journal.WriteKey(write.location));
-                    }
-                    if (write.onComplete != null) {
-                        try {
-                            write.onComplete.run();
-                        } catch (Throwable e) {
-                            e.printStackTrace();
-                        }
-                    }
-                    write = write.getNext();
-                }
-
-                // Signal any waiting threads that the write is on disk.
-                wb.latch.countDown();
+                signalDone(wb);
             }
         } catch (IOException e) {
             synchronized (enqueueMutex) {
@@ -409,4 +398,26 @@ class DataFileAppender implements FileAp
         }
     }
 
+    protected void signalDone(WriteBatch wb) {
+        // Now that the data is on disk, remove the writes from the in
+        // flight
+        // cache.
+        Journal.WriteCommand write = wb.writes.getHead();
+        while (write != null) {
+            if (!write.sync) {
+                inflightWrites.remove(new Journal.WriteKey(write.location));
+            }
+            if (write.onComplete != null) {
+                try {
+                    write.onComplete.run();
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+            write = write.getNext();
+        }
+
+        // Signal any waiting threads that the write is on disk.
+        wb.latch.countDown();
+    }
 }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java?rev=1222705&r1=1222704&r2=1222705&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java Fri Dec
23 15:44:52 2011
@@ -3,10 +3,10 @@ package org.apache.kahadb.journal;
 import java.io.IOException;
 import org.apache.kahadb.util.ByteSequence;
 
-/**
- * User: gtully
- */
 public interface FileAppender {
+    public static final String PROPERTY_LOG_WRITE_STAT_WINDOW = "org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW";
+    public static final int maxStat = Integer.parseInt(System.getProperty(PROPERTY_LOG_WRITE_STAT_WINDOW,
"0"));
+
     Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException;
 
     Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException;

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=1222705&r1=1222704&r2=1222705&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Fri Dec 23
15:44:52 2011
@@ -118,6 +118,7 @@ public class Journal {
 	private ReplicationTarget replicationTarget;
     protected boolean checksum;
     protected boolean checkForCorruptionOnStartup;
+    protected boolean enableAsyncDiskSync = true;
     private Timer timer;
    
 
@@ -753,6 +754,14 @@ public class Journal {
        this.totalLength = storeSizeAccumulator;
     }
 
+    public void setEnableAsyncDiskSync(boolean val) {
+        this.enableAsyncDiskSync = val;
+    }
+
+    public boolean isEnableAsyncDiskSync() {
+        return enableAsyncDiskSync;
+    }
+
     public static class WriteCommand extends LinkedNode<WriteCommand> {
         public final Location location;
         public final ByteSequence data;



Mime
View raw message