activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6377
Date Thu, 28 Jul 2016 14:07:40 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 1bdcb4f96 -> 1a598277c


https://issues.apache.org/jira/browse/AMQ-6377

Reworking patch so that a periodic disk sync uses a journal trace
command to trigger the sync so that everything is done in the same
thread for the writes


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1a598277
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1a598277
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1a598277

Branch: refs/heads/master
Commit: 1a598277cf76e9bace980dfa87982584b5879710
Parents: 1bdcb4f
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Thu Jul 28 09:36:28 2016 -0400
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Thu Jul 28 10:07:10 2016 -0400

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  18 ++-
 .../CallerBufferingDataFileAppender.java        |  16 ++
 .../kahadb/disk/journal/DataFileAppender.java   |  14 +-
 .../store/kahadb/disk/journal/Journal.java      |  21 ---
 .../store/kahadb/JournalSyncStrategyTest.java   | 133 ++++++++++++++++
 .../DataFileAppenderSyncStrategyTest.java       |  93 ++++++++++++
 .../disk/journal/JournalSyncStrategyTest.java   | 152 -------------------
 7 files changed, 271 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/1a598277/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index a58d614..8c79c66 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.ActiveMQMessageAuditNoSync;
@@ -288,6 +289,9 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
     private int journalLogOnLastCompactionCheck;
     private boolean enableSubscriptionStatistics = false;
 
+    //only set when using JournalDiskSyncStrategy.PERIODIC
+    protected final AtomicReference<Location> lastAsyncJournalUpdate = new AtomicReference<>();
+
     @Override
     public void doStart() throws Exception {
         load();
@@ -392,6 +396,7 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
         private long lastCheckpoint = System.currentTimeMillis();
         private long lastCleanup = System.currentTimeMillis();
         private long lastSync = System.currentTimeMillis();
+        private Location lastAsyncUpdate = null;
 
         @Override
         public void run() {
@@ -401,7 +406,14 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
                     long now = System.currentTimeMillis();
                     if (journal.isJournalDiskSyncPeriodic() &&
                             journalDiskSyncInterval > 0 && (now - lastSync >=
journalDiskSyncInterval)) {
-                        journal.syncCurrentDataFile();
+                        Location currentUpdate = lastAsyncJournalUpdate.get();
+                        if (currentUpdate != null && !currentUpdate.equals(lastAsyncUpdate))
{
+                            lastAsyncUpdate = currentUpdate;
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("Writing trace command to trigger journal sync");
+                            }
+                            store(new KahaTraceCommand(), true, null, null);
+                        }
                         lastSync = now;
                     }
                     if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval))
{
@@ -1095,6 +1107,10 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
                 long start = System.currentTimeMillis();
                 location = onJournalStoreComplete == null ? journal.write(sequence, sync)
: journal.write(sequence, onJournalStoreComplete) ;
                 long start2 = System.currentTimeMillis();
+                //Track the last async update so we know if we need to sync at the next checkpoint
+                if (!sync && journal.isJournalDiskSyncPeriodic()) {
+                    lastAsyncJournalUpdate.set(location);
+                }
                 process(data, location, before);
 
                 long end = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/activemq/blob/1a598277/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
index c6b143b..d6a9d2e 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
@@ -23,6 +23,8 @@ import java.util.zip.Checksum;
 import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.RecoverableRandomAccessFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An optimized writer to do batch appends to a data file. This object is thread
@@ -34,6 +36,8 @@ import org.apache.activemq.util.RecoverableRandomAccessFile;
  */
 class CallerBufferingDataFileAppender extends DataFileAppender {
 
+    private static final Logger logger = LoggerFactory.getLogger(CallerBufferingDataFileAppender.class);
+
     final DataByteArrayOutputStream cachedBuffers[] = new DataByteArrayOutputStream[] {
             new DataByteArrayOutputStream(maxWriteBatchSize),
             new DataByteArrayOutputStream(maxWriteBatchSize)
@@ -111,6 +115,12 @@ class CallerBufferingDataFileAppender extends DataFileAppender {
                 wb = (WriteBatch)o;
                 if (dataFile != wb.dataFile) {
                     if (file != null) {
+                        if (periodicSync) {
+                            if (logger.isTraceEnabled()) {
+                                logger.trace("Syning file {} on rotate", dataFile.getFile().getName());
+                            }
+                            file.sync();
+                        }
                         dataFile.closeRandomAccessFile(file);
                     }
                     dataFile = wb.dataFile;
@@ -177,6 +187,12 @@ class CallerBufferingDataFileAppender extends DataFileAppender {
         } finally {
             try {
                 if (file != null) {
+                    if (periodicSync) {
+                        if (logger.isTraceEnabled()) {
+                            logger.trace("Syning file {} on close", dataFile.getFile().getName());
+                        }
+                        file.sync();
+                    }
                     dataFile.closeRandomAccessFile(file);
                 }
             } catch (Throwable ignore) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/1a598277/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index 8df834c..1e87331 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -284,6 +284,12 @@ class DataFileAppender implements FileAppender {
 
                 if (dataFile != wb.dataFile) {
                     if (file != null) {
+                        if (periodicSync) {
+                            if (logger.isTraceEnabled()) {
+                                logger.trace("Syning file {} on rotate", dataFile.getFile().getName());
+                            }
+                            file.sync();
+                        }
                         dataFile.closeRandomAccessFile(file);
                     }
                     dataFile = wb.dataFile;
@@ -342,8 +348,6 @@ class DataFileAppender implements FileAppender {
 
                 if (forceToDisk) {
                     file.sync();
-                } else if (periodicSync) {
-                    journal.currentFileNeedSync.set(true);
                 }
 
                 Journal.WriteCommand lastWrite = wb.writes.getTail();
@@ -368,6 +372,12 @@ class DataFileAppender implements FileAppender {
         } finally {
             try {
                 if (file != null) {
+                    if (periodicSync) {
+                        if (logger.isTraceEnabled()) {
+                            logger.trace("Syning file {} on close", dataFile.getFile().getName());
+                        }
+                        file.sync();
+                    }
                     dataFile.closeRandomAccessFile(file);
                 }
             } catch (Throwable ignore) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/1a598277/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 93d5f7f..e526698 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -41,7 +41,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.zip.Adler32;
@@ -84,7 +83,6 @@ public class Journal {
     public static final byte EOF_EOT = '4';
     public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord();
 
-    protected final AtomicBoolean currentFileNeedSync = new AtomicBoolean();
     private ScheduledExecutorService scheduler;
 
     // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss
@@ -597,7 +595,6 @@ public class Journal {
                 dataFile = newDataFile();
             }
             synchronized (currentDataFile) {
-                syncCurrentDataFile();
                 fileMap.put(dataFile.getDataFileId(), dataFile);
                 fileByFileMap.put(dataFile.getFile(), dataFile);
                 dataFiles.addLast(dataFile);
@@ -610,23 +607,6 @@ public class Journal {
         }
     }
 
-    public void syncCurrentDataFile() throws IOException {
-        synchronized (currentDataFile) {
-            DataFile dataFile = currentDataFile.get();
-            if (dataFile != null && isJournalDiskSyncPeriodic()) {
-                if (currentFileNeedSync.compareAndSet(true, false)) {
-                    LOG.trace("Syncing Journal file: {}", dataFile.getFile().getName());
-                    RecoverableRandomAccessFile file = dataFile.openRandomAccessFile();
-                    try {
-                        file.sync();
-                    } finally {
-                        file.close();
-                    }
-                }
-            }
-        }
-    }
-
     private Runnable preAllocateNextDataFileTask = new Runnable() {
         @Override
         public void run() {
@@ -705,7 +685,6 @@ public class Journal {
         // the appender can be calling back to to the journal blocking a close AMQ-5620
         appender.close();
         synchronized (currentDataFile) {
-            syncCurrentDataFile();
             fileMap.clear();
             fileByFileMap.clear();
             dataFiles.clear();

http://git-wip-us.apache.org/repos/asf/activemq/blob/1a598277/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalSyncStrategyTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalSyncStrategyTest.java
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalSyncStrategyTest.java
new file mode 100644
index 0000000..c22339f
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalSyncStrategyTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.kahadb.disk.journal.FileAppender;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+
+public class JournalSyncStrategyTest  {
+
+    @Rule
+    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
+
+    @Rule
+    public Timeout globalTimeout= new Timeout(10, TimeUnit.SECONDS);
+
+    private KahaDBStore store;
+    private int defaultJournalLength = 10 * 1024;
+
+    @After
+    public void after() throws Exception {
+        if (store != null) {
+            store.stop();
+        }
+    }
+
+    @Test
+    public void testPeriodicSync()throws Exception {
+        store = configureStore(JournalDiskSyncStrategy.PERIODIC);
+        store.setJournalDiskSyncInterval(800);
+        store.start();
+        final Journal journal = store.getJournal();
+        assertTrue(journal.isJournalDiskSyncPeriodic());
+        assertFalse(store.isEnableJournalDiskSyncs());
+        assertEquals(store.getJournalDiskSyncStrategy(), JournalDiskSyncStrategy.PERIODIC.name());
+        assertEquals(store.getJournal().getJournalDiskSyncStrategy(), JournalDiskSyncStrategy.PERIODIC);
+        assertEquals(store.getJournalDiskSyncInterval(), 800);
+
+        Location l = store.lastAsyncJournalUpdate.get();
+
+        //write a message to the store
+        MessageStore messageStore = store.createQueueMessageStore(new ActiveMQQueue("test"));
+        writeMessage(messageStore, 1);
+
+        //make sure message write causes the lastAsyncJournalUpdate to be set with a new
value
+        assertFalse(store.lastAsyncJournalUpdate.get().equals(l));
+    }
+
+    @Test
+    public void testAlwaysSync()throws Exception {
+        store = configureStore(JournalDiskSyncStrategy.ALWAYS);
+        store.start();
+        assertFalse(store.getJournal().isJournalDiskSyncPeriodic());
+        assertTrue(store.isEnableJournalDiskSyncs());
+        assertEquals(store.getJournalDiskSyncStrategy(), JournalDiskSyncStrategy.ALWAYS.name());
+        assertEquals(store.getJournal().getJournalDiskSyncStrategy(), JournalDiskSyncStrategy.ALWAYS);
+
+        MessageStore messageStore = store.createQueueMessageStore(new ActiveMQQueue("test"));
+        writeMessage(messageStore, 1);
+        assertNull(store.lastAsyncJournalUpdate.get());
+    }
+
+    @Test
+    public void testNeverSync() throws Exception {
+        store = configureStore(JournalDiskSyncStrategy.NEVER);
+        store.start();
+        assertFalse(store.getJournal().isJournalDiskSyncPeriodic());
+        assertFalse(store.isEnableJournalDiskSyncs());
+        assertEquals(store.getJournalDiskSyncStrategy(), JournalDiskSyncStrategy.NEVER.name());
+        assertEquals(store.getJournal().getJournalDiskSyncStrategy(), JournalDiskSyncStrategy.NEVER);
+
+        MessageStore messageStore = store.createQueueMessageStore(new ActiveMQQueue("test"));
+        writeMessage(messageStore, 1);
+        assertNull(store.lastAsyncJournalUpdate.get());
+    }
+
+    private KahaDBStore configureStore(JournalDiskSyncStrategy strategy) throws Exception
{
+        KahaDBStore store = new KahaDBStore();
+        store.setJournalMaxFileLength(defaultJournalLength);
+        store.deleteAllMessages();
+        store.setDirectory(dataFileDir.getRoot());
+        if (strategy != null) {
+            store.setJournalDiskSyncStrategy(strategy.name());
+        }
+
+        return store;
+    }
+
+    private void writeMessage(final MessageStore messageStore, int num) throws Exception
{
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setText("testtesttest");
+        MessageId messageId = new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:" +
num);
+        messageId.setBrokerSequenceId(num);
+        message.setMessageId(messageId);
+        messageStore.addMessage(new ConnectionContext(), message);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/1a598277/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderSyncStrategyTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderSyncStrategyTest.java
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderSyncStrategyTest.java
new file mode 100644
index 0000000..494e256
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderSyncStrategyTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.journal;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+
+public class DataFileAppenderSyncStrategyTest  {
+
+    @Rule
+    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
+
+    @Rule
+    public Timeout globalTimeout= new Timeout(10, TimeUnit.SECONDS);
+
+    private KahaDBStore store;
+    private int defaultJournalLength = 10 * 1024;
+
+    @After
+    public void after() throws Exception {
+        if (store != null) {
+            store.stop();
+        }
+    }
+
+    @Test
+    public void testPeriodicSync()throws Exception {
+        store = configureStore(JournalDiskSyncStrategy.PERIODIC);
+        store.start();
+        final Journal journal = store.getJournal();
+
+        DataFileAppender appender = (DataFileAppender) journal.appender;
+        assertTrue(appender.periodicSync);
+    }
+
+    @Test
+    public void testAlwaysSync()throws Exception {
+        store = configureStore(JournalDiskSyncStrategy.ALWAYS);
+        store.start();
+        final Journal journal = store.getJournal();
+
+        DataFileAppender appender = (DataFileAppender) journal.appender;
+        assertFalse(appender.periodicSync);
+    }
+
+    @Test
+    public void testNeverSync() throws Exception {
+        store = configureStore(JournalDiskSyncStrategy.NEVER);
+        store.start();
+        final Journal journal = store.getJournal();
+
+        DataFileAppender appender = (DataFileAppender) journal.appender;
+        assertFalse(appender.periodicSync);
+    }
+
+    private KahaDBStore configureStore(JournalDiskSyncStrategy strategy) throws Exception
{
+        KahaDBStore store = new KahaDBStore();
+        store.setJournalMaxFileLength(defaultJournalLength);
+        store.deleteAllMessages();
+        store.setDirectory(dataFileDir.getRoot());
+        if (strategy != null) {
+            store.setJournalDiskSyncStrategy(strategy.name());
+        }
+
+        return store;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/1a598277/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalSyncStrategyTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalSyncStrategyTest.java
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalSyncStrategyTest.java
deleted file mode 100644
index af618d5..0000000
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalSyncStrategyTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.disk.journal;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.kahadb.KahaDBStore;
-import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
-import org.apache.activemq.util.Wait;
-import org.apache.activemq.util.Wait.Condition;
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.Timeout;
-
-public class JournalSyncStrategyTest  {
-
-    @Rule
-    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
-
-    @Rule
-    public Timeout globalTimeout= new Timeout(10, TimeUnit.SECONDS);
-
-    private KahaDBStore store;
-    private int defaultJournalLength = 10 * 1024;
-
-    @After
-    public void after() throws Exception {
-        if (store != null) {
-            store.stop();
-        }
-    }
-
-    @Test
-    public void testPeriodicSync()throws Exception {
-        store = configureStore(JournalDiskSyncStrategy.PERIODIC);
-        store.start();
-        final Journal journal = store.getJournal();
-        assertTrue(journal.isJournalDiskSyncPeriodic());
-        assertFalse(store.isEnableJournalDiskSyncs());
-
-        MessageStore messageStore = store.createQueueMessageStore(new ActiveMQQueue("test"));
-
-        //write a message to the store
-        writeMessage(messageStore, 1);
-
-        //Make sure the flag was set to true
-        assertTrue(Wait.waitFor(new Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return journal.currentFileNeedSync.get();
-            }
-        }));
-
-        //Make sure a disk sync was done by the executor because a message was added
-        //which will cause the flag to be set to false
-        assertTrue(Wait.waitFor(new Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return !journal.currentFileNeedSync.get();
-            }
-        }));
-
-    }
-
-    @Test
-    public void testSyncRotate()throws Exception {
-        store = configureStore(JournalDiskSyncStrategy.PERIODIC);
-        //Set a long interval to make sure it isn't called in this test
-        store.setJournalDiskSyncInterval(10 * 1000);
-        store.start();
-
-        final Journal journal = store.getJournal();
-        assertTrue(journal.isJournalDiskSyncPeriodic());
-        assertFalse(store.isEnableJournalDiskSyncs());
-        assertEquals(10 * 1000, store.getJournalDiskSyncInterval());
-        journal.currentFileNeedSync.set(true);        //Make sure a disk sync was done by
the executor because a message was added
-
-        //get the current file but pass in a size greater than the
-        //journal length to trigger a rotation so we can verify that it was synced
-        journal.getCurrentDataFile(2 * defaultJournalLength);
-
-        //verify a sync was called (which will set this flag to false)
-        assertFalse(journal.currentFileNeedSync.get());
-    }
-
-    @Test
-    public void testAlwaysSync()throws Exception {
-        store = configureStore(JournalDiskSyncStrategy.ALWAYS);
-        store.start();
-        assertFalse(store.getJournal().isJournalDiskSyncPeriodic());
-        assertTrue(store.isEnableJournalDiskSyncs());
-    }
-
-    @Test
-    public void testNeverSync() throws Exception {
-        store = configureStore(JournalDiskSyncStrategy.NEVER);
-        store.start();
-        assertFalse(store.getJournal().isJournalDiskSyncPeriodic());
-        assertFalse(store.isEnableJournalDiskSyncs());
-    }
-
-    private KahaDBStore configureStore(JournalDiskSyncStrategy strategy) throws Exception
{
-        KahaDBStore store = new KahaDBStore();
-        store.setJournalMaxFileLength(defaultJournalLength);
-        store.deleteAllMessages();
-        store.setDirectory(dataFileDir.getRoot());
-        if (strategy != null) {
-            store.setJournalDiskSyncStrategy(strategy.name());
-        }
-
-        return store;
-    }
-
-    private void writeMessage(final MessageStore messageStore, int num) throws Exception
{
-        ActiveMQTextMessage message = new ActiveMQTextMessage();
-        message.setText("testtesttest");
-        MessageId messageId = new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:" +
num);
-        messageId.setBrokerSequenceId(num);
-        message.setMessageId(messageId);
-        messageStore.addMessage(new ConnectionContext(), message);
-    }
-
-
-}


Mime
View raw message