Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5EE1B74E4 for ; Fri, 23 Dec 2011 15:45:16 +0000 (UTC) Received: (qmail 19472 invoked by uid 500); 23 Dec 2011 15:45:16 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 19435 invoked by uid 500); 23 Dec 2011 15:45:16 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 19428 invoked by uid 99); 23 Dec 2011 15:45:16 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Dec 2011 15:45:16 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Dec 2011 15:45:13 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 24D2E23889E1 for ; Fri, 23 Dec 2011 15:44:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1222705 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/broker/jmx/ activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq-core/src/test/java/org/apache/activemq/store/kahadb/ kahadb/src/main/java/org/... Date: Fri, 23 Dec 2011 15:44:52 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111223154453.24D2E23889E1@eris.apache.org> Author: gtully Date: Fri Dec 23 15:44:52 2011 New Revision: 1222705 URL: http://svn.apache.org/viewvc?rev=1222705&view=rev Log: https://issues.apache.org/jira/browse/AMQ-3646 - Allow KahaDB to run without disk syncs, higher through put without the jms persistence guarantee. Allow 0 peroid to disable checkpoint/cleanup. Allow jmx gc operation to invoke cleanup so that store gc can be initiated via jms to ensure disk is reclaimed. Ensure periodic checkpoint does not sync when enableJournalDiskSyncs=false, it waits for completion but does not force to disk. fix cached buffer allocation and refactor to reuse more code in CallerBufferingDataFileAppender Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=1222705&r1=1222704&r2=1222705&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Fri Dec 23 15:44:52 2011 @@ -38,12 +38,14 @@ import org.apache.activemq.command.Consu import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.util.BrokerSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * */ public class BrokerView implements BrokerViewMBean { - + private static final Logger LOG = LoggerFactory.getLogger(BrokerView.class); ManagedRegionBroker broker; private final BrokerService brokerService; private final AtomicInteger sessionIdCounter = new AtomicInteger(0); @@ -76,6 +78,11 @@ public class BrokerView implements Broke public void gc() throws Exception { brokerService.getBroker().gc(); + try { + brokerService.getPersistenceAdapter().checkpoint(true); + } catch (IOException e) { + LOG.error("Failed to checkpoint persistence adapter on gc request, reason:" + e, e); + } } public void start() throws Exception { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1222705&r1=1222704&r2=1222705&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Fri Dec 23 15:44:52 2011 @@ -980,7 +980,7 @@ public class KahaDBStore extends Message } public void checkpoint(boolean sync) throws IOException { - super.checkpointCleanup(false); + super.checkpointCleanup(sync); } // ///////////////////////////////////////////////////////////////// Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1222705&r1=1222704&r2=1222705&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Dec 23 15:44:52 2011 @@ -67,7 +67,6 @@ public abstract class MessageDatabase ex static final int CLOSED_STATE = 1; static final int OPEN_STATE = 2; static final long NOT_ACKED = -1; - static final long UNMATCHED_SEQ = -2; static final int VERSION = 4; @@ -247,6 +246,10 @@ public abstract class MessageDatabase ex } private void startCheckpoint() { + if (checkpointInterval == 0 && cleanupInterval == 0) { + LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart"); + return; + } synchronized (checkpointThreadLock) { boolean start = false; if (checkpointThread == null) { @@ -264,15 +267,15 @@ public abstract class MessageDatabase ex long lastCheckpoint = System.currentTimeMillis(); // Sleep for a short time so we can periodically check // to see if we need to exit this thread. - long sleepTime = Math.min(checkpointInterval, 500); + long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); while (opened.get()) { Thread.sleep(sleepTime); long now = System.currentTimeMillis(); - if( now - lastCleanup >= cleanupInterval ) { + if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) { checkpointCleanup(true); lastCleanup = now; lastCheckpoint = now; - } else if( now - lastCheckpoint >= checkpointInterval ) { + } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) { checkpointCleanup(false); lastCheckpoint = now; } @@ -392,7 +395,9 @@ public abstract class MessageDatabase ex } journal.close(); synchronized (checkpointThreadLock) { - checkpointThread.join(); + if (checkpointThread != null) { + checkpointThread.join(); + } } } finally { lockFile.unlock(); @@ -781,6 +786,14 @@ public abstract class MessageDatabase ex } } + public ByteSequence toByteSequence(JournalCommand data) throws IOException { + int size = data.serializedSizeFramed(); + DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); + os.writeByte(data.type().getNumber()); + data.writeFramed(os); + return os.toByteSequence(); + } + // ///////////////////////////////////////////////////////////////// // Methods call by the broker to update and query the store. // ///////////////////////////////////////////////////////////////// @@ -788,12 +801,12 @@ public abstract class MessageDatabase ex return store(data, false, null,null); } - public ByteSequence toByteSequence(JournalCommand data) throws IOException { - int size = data.serializedSizeFramed(); - DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); - os.writeByte(data.type().getNumber()); - data.writeFramed(os); - return os.toByteSequence(); + public Location store(JournalCommand data, Runnable onJournalStoreComplete) throws IOException { + return store(data, false, null,null, onJournalStoreComplete); + } + + public Location store(JournalCommand data, boolean sync, Runnable before,Runnable after) throws IOException { + return store(data, sync, before, after, null); } /** @@ -802,14 +815,14 @@ public abstract class MessageDatabase ex * the JournalMessage is used to update the index just like it would be done * during a recovery process. */ - public Location store(JournalCommand data, boolean sync, Runnable before,Runnable after) throws IOException { + public Location store(JournalCommand data, boolean sync, Runnable before,Runnable after, Runnable onJournalStoreComplete) throws IOException { if (before != null) { before.run(); } try { ByteSequence sequence = toByteSequence(data); long start = System.currentTimeMillis(); - Location location = journal.write(sequence, sync); + Location location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ; long start2 = System.currentTimeMillis(); process(data, location, after); long end = System.currentTimeMillis(); @@ -1408,13 +1421,25 @@ public abstract class MessageDatabase ex LOG.debug("Checkpoint done."); } + final Runnable nullCompletionCallback = new Runnable() { + @Override + public void run() { + } + }; private Location checkpointProducerAudit() throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oout = new ObjectOutputStream(baos); oout.writeObject(metadata.producerSequenceIdTracker); oout.flush(); oout.close(); - return store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), true, null, null); + // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false + Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback); + try { + location.getLatch().await(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.toString()); + } + return location; } public HashSet getJournalFilesBeingReplicated() { @@ -2076,6 +2101,7 @@ public abstract class MessageDatabase ex manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); manager.setArchiveDataLogs(isArchiveDataLogs()); manager.setSizeAccumulator(storeSize); + manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs()); if (getDirectoryArchive() != null) { IOHelper.mkdirs(getDirectoryArchive()); manager.setDirectoryArchive(getDirectoryArchive()); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java?rev=1222705&r1=1222704&r2=1222705&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java Fri Dec 23 15:44:52 2011 @@ -33,7 +33,11 @@ import org.apache.activemq.ActiveMQConne import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ConnectionControl; +import org.apache.activemq.kaha.impl.async.DataFileAppenderTest; +import org.apache.kahadb.journal.FileAppender; +import org.apache.kahadb.journal.Journal; import org.junit.After; +import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; @@ -52,14 +56,14 @@ public class KahaDBFastEnqueueTest { private boolean useBytesMessage= true; private final int parallelProducer = 20; private Vector exceptions = new Vector(); - final long toSend = 500000; + long toSend = 10000; - @Ignore("too slow, exploring getting broker disk bound") // use with: // -Xmx4g -Dorg.apache.kahadb.journal.appender.WRITE_STAT_WINDOW=10000 -Dorg.apache.kahadb.journal.CALLER_BUFFER_APPENDER=true + @Test public void testPublishNoConsumer() throws Exception { - startBroker(true); + startBroker(true, 10); final AtomicLong sharedCount = new AtomicLong(toSend); long start = System.currentTimeMillis(); @@ -82,19 +86,57 @@ public class KahaDBFastEnqueueTest { assertTrue("No exceptions: " + exceptions, exceptions.isEmpty()); long totalSent = toSend * payloadString.length(); - //System.out.println("Pre shutdown: Index totalWritten: " + kahaDBPersistenceAdapter.getStore().getPageFile().totalWritten); + double duration = System.currentTimeMillis() - start; + stopBroker(); + LOG.info("Duration: " + duration + "ms"); + LOG.info("Rate: " + (toSend * 1000/duration) + "m/s"); + LOG.info("Total send: " + totalSent); + LOG.info("Total journal write: " + kahaDBPersistenceAdapter.getStore().getJournal().length()); + LOG.info("Journal writes %: " + kahaDBPersistenceAdapter.getStore().getJournal().length() / (double)totalSent * 100 + "%"); + + restartBroker(0, 1200000); + consumeMessages(toSend); + } + + @Test + public void testPublishNoConsumerNoCheckpoint() throws Exception { + + toSend = 100; + startBroker(true, 0); + + final AtomicLong sharedCount = new AtomicLong(toSend); + long start = System.currentTimeMillis(); + ExecutorService executorService = Executors.newCachedThreadPool(); + for (int i=0; i< parallelProducer; i++) { + executorService.execute(new Runnable() { + @Override + public void run() { + try { + publishMessages(sharedCount, 0); + } catch (Exception e) { + exceptions.add(e); + } + } + }); + } + executorService.shutdown(); + executorService.awaitTermination(30, TimeUnit.MINUTES); + assertTrue("Producers done in time", executorService.isTerminated()); + assertTrue("No exceptions: " + exceptions, exceptions.isEmpty()); + long totalSent = toSend * payloadString.length(); + + broker.getAdminView().gc(); + double duration = System.currentTimeMillis() - start; stopBroker(); - System.out.println("Duration: " + duration + "ms"); - System.out.println("Rate: " + (toSend * 1000/duration) + "m/s"); - System.out.println("Total send: " + totalSent); - System.out.println("Total journal write: " + kahaDBPersistenceAdapter.getStore().getJournal().length()); - //System.out.println("Total index write: " + kahaDBPersistenceAdapter.getStore().getPageFile().totalWritten); - System.out.println("Journal writes %: " + kahaDBPersistenceAdapter.getStore().getJournal().length() / (double)totalSent * 100 + "%"); - //System.out.println("Index writes %: " + kahaDBPersistenceAdapter.getStore().getPageFile().totalWritten / (double)totalSent * 100 + "%"); + LOG.info("Duration: " + duration + "ms"); + LOG.info("Rate: " + (toSend * 1000/duration) + "m/s"); + LOG.info("Total send: " + totalSent); + LOG.info("Total journal write: " + kahaDBPersistenceAdapter.getStore().getJournal().length()); + LOG.info("Journal writes %: " + kahaDBPersistenceAdapter.getStore().getJournal().length() / (double)totalSent * 100 + "%"); - restartBroker(0); + restartBroker(0, 0); consumeMessages(toSend); } @@ -110,10 +152,16 @@ public class KahaDBFastEnqueueTest { assertNull("none left over", consumer.receive(2000)); } - private void restartBroker(int restartDelay) throws Exception { + private void restartBroker(int restartDelay, int checkpoint) throws Exception { stopBroker(); TimeUnit.MILLISECONDS.sleep(restartDelay); - startBroker(false); + startBroker(false, checkpoint); + } + + @Before + public void setProps() { + System.setProperty(Journal.CALLER_BUFFER_APPENDER, Boolean.toString(true)); + System.setProperty(FileAppender.PROPERTY_LOG_WRITE_STAT_WINDOW, "10000"); } @After @@ -122,6 +170,8 @@ public class KahaDBFastEnqueueTest { broker.stop(); broker.waitUntilStopped(); } + System.clearProperty(Journal.CALLER_BUFFER_APPENDER); + System.clearProperty(FileAppender.PROPERTY_LOG_WRITE_STAT_WINDOW); } final double sampleRate = 100000; @@ -153,14 +203,14 @@ public class KahaDBFastEnqueueTest { connection.close(); } - public void startBroker(boolean deleteAllMessages) throws Exception { + public void startBroker(boolean deleteAllMessages, int checkPointPeriod) throws Exception { broker = new BrokerService(); broker.setDeleteAllMessagesOnStartup(deleteAllMessages); kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter(); kahaDBPersistenceAdapter.setEnableJournalDiskSyncs(false); // defer checkpoints which require a sync - kahaDBPersistenceAdapter.setCleanupInterval(20 * 60 * 1000); - kahaDBPersistenceAdapter.setCheckpointInterval(20 * 60 * 1000); + kahaDBPersistenceAdapter.setCleanupInterval(checkPointPeriod); + kahaDBPersistenceAdapter.setCheckpointInterval(checkPointPeriod); // optimise for disk best batch rate kahaDBPersistenceAdapter.setJournalMaxWriteBatchSize(24*1024*1024); //4mb default @@ -171,7 +221,6 @@ public class KahaDBFastEnqueueTest { kahaDBPersistenceAdapter.setEnableIndexRecoveryFile(false); kahaDBPersistenceAdapter.setEnableIndexDiskSyncs(false); - broker.setUseJmx(false); broker.addConnector("tcp://0.0.0.0:0"); broker.start(); @@ -179,6 +228,7 @@ public class KahaDBFastEnqueueTest { connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri() + options); } + @Test public void testRollover() throws Exception { byte flip = 0x1; for (long i=0; i inflightWrites; - protected final Object enqueueMutex = new Object() { - }; - protected WriteBatch nextWriteBatch; - - protected boolean shutdown; - protected IOException firstAsyncException; - protected final CountDownLatch shutdownDone = new CountDownLatch(1); - protected int maxWriteBatchSize; - - private boolean running; - private Thread thread; +class CallerBufferingDataFileAppender extends DataFileAppender { final DataByteArrayOutputStream cachedBuffers[] = new DataByteArrayOutputStream[] { new DataByteArrayOutputStream(maxWriteBatchSize), new DataByteArrayOutputStream(maxWriteBatchSize) }; volatile byte flip = 0x1; - public class WriteBatch { + public class WriteBatch extends DataFileAppender.WriteBatch { DataByteArrayOutputStream buff = cachedBuffers[flip ^= 1]; - public final DataFile dataFile; - - public final LinkedNodeList writes = new LinkedNodeList(); - public final CountDownLatch latch = new CountDownLatch(1); - private final int offset; - public int size = Journal.BATCH_CONTROL_RECORD_SIZE; - public AtomicReference exception = new AtomicReference(); - public boolean forceToDisk; - + private boolean forceToDisk; public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws IOException { - this.dataFile = dataFile; - this.offset = offset; - this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE); - this.size=Journal.BATCH_CONTROL_RECORD_SIZE; - journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE); + super(dataFile, offset); initBuffer(buff); append(write); } - public boolean canAppend(Journal.WriteCommand write) { - int newSize = size + write.location.getSize(); - if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength() ) { - return false; - } - return true; - } - public void append(Journal.WriteCommand write) throws IOException { - this.writes.addLast(write); - write.location.setDataFileId(dataFile.getDataFileId()); - write.location.setOffset(offset+size); - int s = write.location.getSize(); - size += s; - dataFile.incrementLength(s); - journal.addToTotalLength(s); + super.append(write); forceToDisk |= appendToBuffer(write, buff); } } + @Override + protected DataFileAppender.WriteBatch newWriteBatch(Journal.WriteCommand write, DataFile file) throws IOException { + return new WriteBatch(file, file.getLength(), write); + } + private void initBuffer(DataByteArrayOutputStream buff) throws IOException { // Write an empty batch control record. buff.reset(); @@ -108,148 +67,10 @@ class CallerBufferingDataFileAppender im buff.writeLong(0); } - /** - * Construct a Store writer - */ public CallerBufferingDataFileAppender(Journal dataManager) { - this.journal = dataManager; - this.inflightWrites = this.journal.getInflightWrites(); - this.maxWriteBatchSize = this.journal.getWriteBatchSize(); - } - - public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException { - - // Write the packet our internal buffer. - int size = data.getLength() + Journal.RECORD_HEAD_SPACE; - - final Location location = new Location(); - location.setSize(size); - location.setType(type); - - Journal.WriteCommand write = new Journal.WriteCommand(location, data, sync); - - WriteBatch batch = enqueue(write); - location.setLatch(batch.latch); - if (sync) { - try { - batch.latch.await(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - IOException exception = batch.exception.get(); - if (exception != null) { - throw exception; - } - } - - return location; - } - - public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException { - // Write the packet our internal buffer. - int size = data.getLength() + Journal.RECORD_HEAD_SPACE; - - final Location location = new Location(); - location.setSize(size); - location.setType(type); - - Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete); - - WriteBatch batch = enqueue(write); - - location.setLatch(batch.latch); - return location; - } - - private WriteBatch enqueue(Journal.WriteCommand write) throws IOException { - synchronized (enqueueMutex) { - if (shutdown) { - throw new IOException("Async Writter Thread Shutdown"); - } - - if (!running) { - running = true; - thread = new Thread() { - public void run() { - processQueue(); - } - }; - thread.setPriority(Thread.MAX_PRIORITY); - thread.setDaemon(true); - thread.setName("ActiveMQ Data File Writer"); - thread.start(); - firstAsyncException = null; - } - - if (firstAsyncException != null) { - throw firstAsyncException; - } - - while ( true ) { - if (nextWriteBatch == null) { - DataFile file = journal.getCurrentWriteFile(); - if( file.getLength() > journal.getMaxFileLength() ) { - file = journal.rotateWriteFile(); - } - - nextWriteBatch = new WriteBatch(file, file.getLength(), write); - enqueueMutex.notifyAll(); - break; - } else { - // Append to current batch if possible.. - if (nextWriteBatch.canAppend(write)) { - nextWriteBatch.append(write); - break; - } else { - // Otherwise wait for the queuedCommand to be null - try { - while (nextWriteBatch != null) { - final long start = System.currentTimeMillis(); - enqueueMutex.wait(); - if (maxStat > 0) { - System.err.println("Watiting for write to finish with full batch... millis: " + (System.currentTimeMillis() - start)); - } - } - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - if (shutdown) { - throw new IOException("Async Writter Thread Shutdown"); - } - } - } - } - if (!write.sync) { - inflightWrites.put(new Journal.WriteKey(write.location), write); - } - return nextWriteBatch; - } + super(dataManager); } - public void close() throws IOException { - synchronized (enqueueMutex) { - if (!shutdown) { - shutdown = true; - if (running) { - enqueueMutex.notifyAll(); - } else { - shutdownDone.countDown(); - } - } - } - - try { - shutdownDone.await(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - - } - - public static final String PROPERTY_LOG_WRITE_STAT_WINDOW = "org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW"; - public static final int maxStat = Integer.parseInt(System.getProperty(PROPERTY_LOG_WRITE_STAT_WINDOW, "0")); - int statIdx = 0; - int[] stats = new int[maxStat]; /** * The async processing loop that writes to the data files and does the * force calls. Since the file sync() call is the slowest of all the @@ -258,6 +79,7 @@ class CallerBufferingDataFileAppender im * accomplished attaching the same CountDownLatch instance to every force * request in a group. */ + @Override protected void processQueue() { DataFile dataFile = null; RandomAccessFile file = null; @@ -337,27 +159,9 @@ class CallerBufferingDataFileAppender im Journal.WriteCommand lastWrite = wb.writes.getTail(); journal.setLastAppendLocation(lastWrite.location); + signalDone(wb); - // Now that the data is on disk, remove the writes from the in - // flight - // cache. - Journal.WriteCommand write = wb.writes.getHead(); - while (write != null) { - if (!write.sync) { - inflightWrites.remove(new Journal.WriteKey(write.location)); - } - if (write.onComplete != null) { - try { - write.onComplete.run(); - } catch (Throwable e) { - e.printStackTrace(); - } - } - write = write.getNext(); - } - // Signal any waiting threads that the write is on disk. - wb.latch.countDown(); } } catch (IOException e) { synchronized (enqueueMutex) { @@ -388,6 +192,6 @@ class CallerBufferingDataFileAppender im buff.writeInt(write.location.getSize()); buff.writeByte(write.location.getType()); buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength()); - return write.sync | write.onComplete != null; + return write.sync | (syncOnComplete && write.onComplete != null); } } Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=1222705&r1=1222704&r2=1222705&view=diff ============================================================================== --- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original) +++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Fri Dec 23 15:44:52 2011 @@ -48,8 +48,9 @@ class DataFileAppender implements FileAp protected IOException firstAsyncException; protected final CountDownLatch shutdownDone = new CountDownLatch(1); protected int maxWriteBatchSize; + protected final boolean syncOnComplete; - private boolean running; + protected boolean running; private Thread thread; public static class WriteKey { @@ -83,16 +84,20 @@ class DataFileAppender implements FileAp public final LinkedNodeList writes = new LinkedNodeList(); public final CountDownLatch latch = new CountDownLatch(1); - private final int offset; + protected final int offset; public int size = Journal.BATCH_CONTROL_RECORD_SIZE; public AtomicReference exception = new AtomicReference(); - public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws IOException { + public WriteBatch(DataFile dataFile,int offset) { this.dataFile = dataFile; this.offset = offset; this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE); this.size=Journal.BATCH_CONTROL_RECORD_SIZE; journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE); + } + + public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws IOException { + this(dataFile, offset); append(write); } @@ -122,6 +127,7 @@ class DataFileAppender implements FileAp this.journal = dataManager; this.inflightWrites = this.journal.getInflightWrites(); this.maxWriteBatchSize = this.journal.getWriteBatchSize(); + this.syncOnComplete = this.journal.isEnableAsyncDiskSync(); } public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException { @@ -199,7 +205,7 @@ class DataFileAppender implements FileAp file = journal.rotateWriteFile(); } - nextWriteBatch = new WriteBatch(file, file.getLength(), write); + nextWriteBatch = newWriteBatch(write, file); enqueueMutex.notifyAll(); break; } else { @@ -233,6 +239,10 @@ class DataFileAppender implements FileAp } } + protected WriteBatch newWriteBatch(Journal.WriteCommand write, DataFile file) throws IOException { + return new WriteBatch(file, file.getLength(), write); + } + public void close() throws IOException { synchronized (enqueueMutex) { if (!shutdown) { @@ -253,8 +263,6 @@ class DataFileAppender implements FileAp } - public static final String PROPERTY_LOG_WRITE_STAT_WINDOW = "org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW"; - public static final int maxStat = Integer.parseInt(System.getProperty(PROPERTY_LOG_WRITE_STAT_WINDOW, "0")); int statIdx = 0; int[] stats = new int[maxStat]; /** @@ -317,7 +325,7 @@ class DataFileAppender implements FileAp boolean forceToDisk = false; while (write != null) { - forceToDisk |= write.sync | write.onComplete != null; + forceToDisk |= write.sync | (syncOnComplete && write.onComplete != null); buff.writeInt(write.location.getSize()); buff.writeByte(write.location.getType()); buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength()); @@ -363,26 +371,7 @@ class DataFileAppender implements FileAp Journal.WriteCommand lastWrite = wb.writes.getTail(); journal.setLastAppendLocation(lastWrite.location); - // Now that the data is on disk, remove the writes from the in - // flight - // cache. - write = wb.writes.getHead(); - while (write != null) { - if (!write.sync) { - inflightWrites.remove(new Journal.WriteKey(write.location)); - } - if (write.onComplete != null) { - try { - write.onComplete.run(); - } catch (Throwable e) { - e.printStackTrace(); - } - } - write = write.getNext(); - } - - // Signal any waiting threads that the write is on disk. - wb.latch.countDown(); + signalDone(wb); } } catch (IOException e) { synchronized (enqueueMutex) { @@ -409,4 +398,26 @@ class DataFileAppender implements FileAp } } + protected void signalDone(WriteBatch wb) { + // Now that the data is on disk, remove the writes from the in + // flight + // cache. + Journal.WriteCommand write = wb.writes.getHead(); + while (write != null) { + if (!write.sync) { + inflightWrites.remove(new Journal.WriteKey(write.location)); + } + if (write.onComplete != null) { + try { + write.onComplete.run(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + write = write.getNext(); + } + + // Signal any waiting threads that the write is on disk. + wb.latch.countDown(); + } } Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java?rev=1222705&r1=1222704&r2=1222705&view=diff ============================================================================== --- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java (original) +++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java Fri Dec 23 15:44:52 2011 @@ -3,10 +3,10 @@ package org.apache.kahadb.journal; import java.io.IOException; import org.apache.kahadb.util.ByteSequence; -/** - * User: gtully - */ public interface FileAppender { + public static final String PROPERTY_LOG_WRITE_STAT_WINDOW = "org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW"; + public static final int maxStat = Integer.parseInt(System.getProperty(PROPERTY_LOG_WRITE_STAT_WINDOW, "0")); + Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException; Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException; Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=1222705&r1=1222704&r2=1222705&view=diff ============================================================================== --- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original) +++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Fri Dec 23 15:44:52 2011 @@ -118,6 +118,7 @@ public class Journal { private ReplicationTarget replicationTarget; protected boolean checksum; protected boolean checkForCorruptionOnStartup; + protected boolean enableAsyncDiskSync = true; private Timer timer; @@ -753,6 +754,14 @@ public class Journal { this.totalLength = storeSizeAccumulator; } + public void setEnableAsyncDiskSync(boolean val) { + this.enableAsyncDiskSync = val; + } + + public boolean isEnableAsyncDiskSync() { + return enableAsyncDiskSync; + } + public static class WriteCommand extends LinkedNode { public final Location location; public final ByteSequence data;