activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [2/6] activemq-artemis git commit: ARTEMIS-906 Memory Mapped JournalType
Date Fri, 03 Feb 2017 14:30:08 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java
new file mode 100644
index 0000000..cf87cde
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.journal;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.tests.unit.core.journal.impl.SequentialFileFactoryTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MappedSequentialFileFactoryTest extends SequentialFileFactoryTestBase {
+
+   @Override
+   protected SequentialFileFactory createFactory(String folder) {
+      return new MappedSequentialFileFactory(new File(folder));
+   }
+
+   @Test
+   public void testInterrupts() throws Throwable {
+
+      final EncodingSupport fakeEncoding = new EncodingSupport() {
+         @Override
+         public int getEncodeSize() {
+            return 10;
+         }
+
+         @Override
+         public void encode(ActiveMQBuffer buffer) {
+            buffer.writeBytes(new byte[10]);
+         }
+
+         @Override
+         public void decode(ActiveMQBuffer buffer) {
+
+         }
+      };
+
+      final AtomicInteger calls = new AtomicInteger(0);
+      final MappedSequentialFileFactory factory = new MappedSequentialFileFactory(new File(getTestDir()),
(code, message, file) -> {
+         new Exception("shutdown").printStackTrace();
+         calls.incrementAndGet();
+      });
+
+      Thread threadOpen = new Thread() {
+         @Override
+         public void run() {
+            try {
+               Thread.currentThread().interrupt();
+               SequentialFile file = factory.createSequentialFile("file.txt");
+               file.open();
+            } catch (Exception e) {
+               e.printStackTrace();
+            }
+         }
+      };
+
+      threadOpen.start();
+      threadOpen.join();
+
+      Thread threadClose = new Thread() {
+         @Override
+         public void run() {
+            try {
+               SequentialFile file = factory.createSequentialFile("file.txt");
+               file.open();
+               file.write(fakeEncoding, true);
+               Thread.currentThread().interrupt();
+               file.close();
+            } catch (Exception e) {
+               e.printStackTrace();
+            }
+         }
+      };
+
+      threadClose.start();
+      threadClose.join();
+
+      Thread threadWrite = new Thread() {
+         @Override
+         public void run() {
+            try {
+               SequentialFile file = factory.createSequentialFile("file.txt");
+               file.open();
+               Thread.currentThread().interrupt();
+               file.write(fakeEncoding, true);
+               file.close();
+
+            } catch (Exception e) {
+               e.printStackTrace();
+            }
+         }
+      };
+
+      threadWrite.start();
+      threadWrite.join();
+
+      Thread threadFill = new Thread() {
+         @Override
+         public void run() {
+            try {
+               SequentialFile file = factory.createSequentialFile("file.txt");
+               file.open();
+               Thread.currentThread().interrupt();
+               file.fill(1024);
+               file.close();
+
+            } catch (Exception e) {
+               e.printStackTrace();
+            }
+         }
+      };
+
+      threadFill.start();
+      threadFill.join();
+
+      Thread threadWriteDirect = new Thread() {
+         @Override
+         public void run() {
+            try {
+               SequentialFile file = factory.createSequentialFile("file.txt");
+               file.open();
+               ByteBuffer buffer = ByteBuffer.allocate(10);
+               buffer.put(new byte[10]);
+               Thread.currentThread().interrupt();
+               file.writeDirect(buffer, true);
+               file.close();
+
+            } catch (Exception e) {
+               e.printStackTrace();
+            }
+         }
+      };
+
+      threadWriteDirect.start();
+      threadWriteDirect.join();
+
+      Thread threadRead = new Thread() {
+         @Override
+         public void run() {
+            try {
+               SequentialFile file = factory.createSequentialFile("file.txt");
+               file.open();
+               file.write(fakeEncoding, true);
+               file.position(0);
+               ByteBuffer readBytes = ByteBuffer.allocate(fakeEncoding.getEncodeSize());
+               Thread.currentThread().interrupt();
+               file.read(readBytes);
+               file.close();
+
+            } catch (Exception e) {
+               e.printStackTrace();
+            }
+         }
+      };
+
+      threadRead.start();
+      threadRead.join();
+
+      // An interrupt exception shouldn't issue a shutdown
+      Assert.assertEquals(0, calls.get());
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
index 8f15c48..d2ffd6f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
@@ -22,8 +22,11 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.ArtemisConstants;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
 import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.LoaderCallback;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
@@ -102,6 +105,27 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
       internalTest("nio2", getTestDir(), 10000, 0, true, true, 1);
    }
 
+   @Test
+   public void testMMap() throws Exception {
+      internalTest("mmap", getTestDir(), 10000, 100, true, true, 1);
+   }
+
+   @Test
+   public void testMMAPHugeTransaction() throws Exception {
+      internalTest("mmap", getTestDir(), 10000, 10000, true, true, 1);
+   }
+
+   @Test
+   public void testMMAPOMultiThread() throws Exception {
+      internalTest("mmap", getTestDir(), 1000, 100, true, true, 10);
+   }
+
+   @Test
+   public void testMMAPNonTransactional() throws Exception {
+      internalTest("mmap", getTestDir(), 10000, 0, true, true, 1);
+   }
+
+
    // Package protected ---------------------------------------------
 
    private void internalTest(final String type,
@@ -234,7 +258,7 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
 
       if (args.length != 5) {
          System.err.println("Use: java -cp <classpath> " + ValidateTransactionHealthTest.class.getCanonicalName()
+
-                               " aio|nio <journalDirectory> <NumberOfElements>
<TransactionSize> <NumberOfThreads>");
+                               " aio|nio|mmap <journalDirectory> <NumberOfElements>
<TransactionSize> <NumberOfThreads>");
          System.exit(-1);
       }
       System.out.println("Running");
@@ -320,15 +344,22 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase
{
    }
 
    public static JournalImpl createJournal(final String journalType, final String journalDir)
{
-      JournalImpl journal = new JournalImpl(10485760, 2, 2, 0, 0, ValidateTransactionHealthTest.getFactory(journalType,
journalDir), "journaltst", "tst", 500);
+      JournalImpl journal = new JournalImpl(10485760, 2, 2, 0, 0, ValidateTransactionHealthTest.getFactory(journalType,
journalDir, 10485760), "journaltst", "tst", 500);
       return journal;
    }
 
-   public static SequentialFileFactory getFactory(final String factoryType, final String
directory) {
+   public static SequentialFileFactory getFactory(final String factoryType, final String
directory, int fileSize) {
       if (factoryType.equals("aio")) {
          return new AIOSequentialFileFactory(new File(directory), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 10, false);
       } else if (factoryType.equals("nio2")) {
          return new NIOSequentialFileFactory(new File(directory), true, 1);
+      } else if (factoryType.equals("mmap")) {
+         return new MappedSequentialFileFactory(new File(directory), new IOCriticalErrorListener()
{
+            @Override
+            public void onIOException(Throwable code, String message, SequentialFile file)
{
+               code.printStackTrace();
+            }
+         }, true).chunkBytes(fileSize).overlapBytes(0);
       } else {
          return new NIOSequentialFileFactory(new File(directory), false, 1);
       }


Mime
View raw message