activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/5] activemq-artemis git commit: ARTEMIS-319 Improving files allocation and implementing journal-pool-files
Date Thu, 10 Dec 2015 22:32:38 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
new file mode 100644
index 0000000..32d8816
--- /dev/null
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.performance.storage;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.replication.ReplicationManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.server.RouteContextList;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class PersistMultiThreadTest extends ActiveMQTestBase {
+
+   final String DIRECTORY = "./target/journaltmp";
+
+   FakePagingStore fakePagingStore = new FakePagingStore();
+
+   @Test
+   public void testMultipleWrites() throws Exception {
+      deleteDirectory(new File(DIRECTORY));
+      ActiveMQServer server = createServer(true);
+      server.getConfiguration().setJournalCompactMinFiles(ActiveMQDefaultConfiguration.getDefaultJournalCompactMinFiles());
+      server.getConfiguration().setJournalCompactPercentage(ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage());
+      server.getConfiguration().setJournalDirectory(DIRECTORY + "/journal");
+      server.getConfiguration().setBindingsDirectory(DIRECTORY + "/bindings");
+      server.getConfiguration().setPagingDirectory(DIRECTORY + "/paging");
+      server.getConfiguration().setLargeMessagesDirectory(DIRECTORY + "/largemessage");
+
+      server.getConfiguration().setJournalFileSize(10 * 1024 * 1024);
+      server.getConfiguration().setJournalMinFiles(2);
+      server.getConfiguration().setJournalType(JournalType.ASYNCIO);
+
+      server.start();
+
+      StorageManager storage = server.getStorageManager();
+
+      long msgID = storage.generateID();
+      System.out.println("msgID=" + msgID);
+
+      int NUMBER_OF_THREADS = 50;
+      int NUMBER_OF_MESSAGES = 5000;
+
+      MyThread[] threads = new MyThread[NUMBER_OF_THREADS];
+
+      final CountDownLatch alignFlag = new CountDownLatch(NUMBER_OF_THREADS);
+      final CountDownLatch startFlag = new CountDownLatch(1);
+      final CountDownLatch finishFlag = new CountDownLatch(NUMBER_OF_THREADS);
+
+      MyDeleteThread deleteThread = new MyDeleteThread("deleteThread", storage, NUMBER_OF_MESSAGES
* NUMBER_OF_THREADS * 10);
+      deleteThread.start();
+
+      for (int i = 0; i < threads.length; i++) {
+         threads[i] = new MyThread("writer::" + i, storage, NUMBER_OF_MESSAGES, alignFlag,
startFlag, finishFlag);
+      }
+
+      for (MyThread t : threads) {
+         t.start();
+      }
+
+      alignFlag.await();
+
+      long startTime = System.currentTimeMillis();
+      startFlag.countDown();
+
+      // I'm using a countDown to avoid measuring time spent on thread context from join.
+      // i.e. i want to measure as soon as the loops are done
+      finishFlag.await();
+      long endtime = System.currentTimeMillis();
+
+      System.out.println("Time:: " + (endtime - startTime));
+
+      for (MyThread t : threads) {
+         t.join();
+         Assert.assertEquals(0, t.errors.get());
+      }
+
+      deleteThread.join();
+      Assert.assertEquals(0, deleteThread.errors.get());
+
+   }
+
+   LinkedBlockingDeque<Long> deletes = new LinkedBlockingDeque<>();
+
+   class MyThread extends Thread {
+
+      final StorageManager storage;
+      final int numberOfMessages;
+      final AtomicInteger errors = new AtomicInteger(0);
+
+      final CountDownLatch align;
+      final CountDownLatch start;
+      final CountDownLatch finish;
+
+      MyThread(String name,
+               StorageManager storage,
+               int numberOfMessages,
+               CountDownLatch align,
+               CountDownLatch start,
+               CountDownLatch finish) {
+         super(name);
+         this.storage = storage;
+         this.numberOfMessages = numberOfMessages;
+         this.align = align;
+         this.start = start;
+         this.finish = finish;
+      }
+
+      public void run() {
+         try {
+            align.countDown();
+            start.await();
+
+            long id = storage.generateID();
+            long txID = storage.generateID();
+
+            // each thread will store a single message that will never be deleted, trying
to force compacting to happen
+            storeMessage(txID, id);
+            storage.commit(txID);
+
+            OperationContext ctx = storage.getContext();
+
+            for (int i = 0; i < numberOfMessages; i++) {
+
+               txID = storage.generateID();
+
+               long[] messageID = new long[10];
+
+               for (int msgI = 0; msgI < 10; msgI++) {
+                  id = storage.generateID();
+
+                  messageID[msgI] = id;
+
+                  storeMessage(txID, id);
+               }
+
+               storage.commit(txID);
+               ctx.waitCompletion();
+
+               for (long deleteID : messageID) {
+                  deletes.add(deleteID);
+               }
+            }
+         }
+         catch (Exception e) {
+            e.printStackTrace();
+            errors.incrementAndGet();
+         }
+         finally {
+            finish.countDown();
+         }
+
+      }
+
+      private void storeMessage(long txID, long id) throws Exception {
+         ServerMessage message = new ServerMessageImpl(id, 10 * 1024);
+         message.setPagingStore(fakePagingStore);
+
+         message.getBodyBuffer().writeBytes(new byte[104]);
+         message.putStringProperty("hello", "" + id);
+
+         storage.storeMessageTransactional(txID, message);
+         storage.storeReferenceTransactional(txID, 1, id);
+
+         message.decrementRefCount();
+      }
+
+   }
+
+   class MyDeleteThread extends Thread {
+
+      final StorageManager storage;
+      final int numberOfMessages;
+      final AtomicInteger errors = new AtomicInteger(0);
+
+      MyDeleteThread(String name, StorageManager storage, int numberOfMessages) {
+         super(name);
+         this.storage = storage;
+         this.numberOfMessages = numberOfMessages;
+      }
+
+      public void run() {
+         long deletesNr = 0;
+         try {
+
+            for (int i = 0; i < numberOfMessages; i++) {
+               if (i % 1000 == 0) {
+                  //                        storage.getContext().waitCompletion();
+                  //                        deletesNr = 0;
+                  //                        Thread.sleep(200);
+               }
+               deletesNr++;
+               Long deleteID = deletes.poll(10, TimeUnit.MINUTES);
+               if (deleteID == null) {
+                  System.err.println("Coudn't poll delete info");
+                  errors.incrementAndGet();
+                  break;
+               }
+
+               storage.storeAcknowledge(1, deleteID);
+               storage.deleteMessage(deleteID);
+            }
+         }
+         catch (Exception e) {
+            e.printStackTrace(System.out);
+            errors.incrementAndGet();
+         }
+         finally {
+            System.err.println("Finished the delete loop!!!! deleted " + deletesNr);
+         }
+      }
+   }
+
+   class FakePagingStore implements PagingStore {
+
+      @Override
+      public SimpleString getAddress() {
+         return null;
+      }
+
+      @Override
+      public int getNumberOfPages() {
+         return 0;
+      }
+
+      @Override
+      public int getCurrentWritingPage() {
+         return 0;
+      }
+
+      @Override
+      public SimpleString getStoreName() {
+         return null;
+      }
+
+      @Override
+      public File getFolder() {
+         return null;
+      }
+
+      @Override
+      public AddressFullMessagePolicy getAddressFullMessagePolicy() {
+         return null;
+      }
+
+      @Override
+      public long getFirstPage() {
+         return 0;
+      }
+
+      @Override
+      public long getPageSizeBytes() {
+         return 0;
+      }
+
+      @Override
+      public long getAddressSize() {
+         return 0;
+      }
+
+      @Override
+      public long getMaxSize() {
+         return 0;
+      }
+
+      @Override
+      public void applySetting(AddressSettings addressSettings) {
+
+      }
+
+      @Override
+      public boolean isPaging() {
+         return false;
+      }
+
+      @Override
+      public void sync() throws Exception {
+
+      }
+
+      @Override
+      public void ioSync() throws Exception {
+
+      }
+
+      @Override
+      public boolean page(ServerMessage message,
+                          Transaction tx,
+                          RouteContextList listCtx,
+                          ReentrantReadWriteLock.ReadLock readLock) throws Exception {
+         return false;
+      }
+
+      @Override
+      public Page createPage(int page) throws Exception {
+         return null;
+      }
+
+      @Override
+      public boolean checkPageFileExists(int page) throws Exception {
+         return false;
+      }
+
+      @Override
+      public PagingManager getPagingManager() {
+         return null;
+      }
+
+      @Override
+      public PageCursorProvider getCursorProvider() {
+         return null;
+      }
+
+      @Override
+      public void processReload() throws Exception {
+
+      }
+
+      @Override
+      public Page depage() throws Exception {
+         return null;
+      }
+
+      @Override
+      public void forceAnotherPage() throws Exception {
+
+      }
+
+      @Override
+      public Page getCurrentPage() {
+         return null;
+      }
+
+      @Override
+      public boolean startPaging() throws Exception {
+         return false;
+      }
+
+      @Override
+      public void stopPaging() throws Exception {
+
+      }
+
+      @Override
+      public void addSize(int size) {
+
+      }
+
+      @Override
+      public boolean checkMemory(Runnable runnable) {
+         return false;
+      }
+
+      @Override
+      public boolean lock(long timeout) {
+         return false;
+      }
+
+      @Override
+      public void unlock() {
+
+      }
+
+      @Override
+      public void flushExecutors() {
+
+      }
+
+      @Override
+      public Collection<Integer> getCurrentIds() throws Exception {
+         return null;
+      }
+
+      @Override
+      public void sendPages(ReplicationManager replicator, Collection<Integer> pageIds)
throws Exception {
+
+      }
+
+      @Override
+      public void disableCleanup() {
+
+      }
+
+      @Override
+      public void enableCleanup() {
+
+      }
+
+      @Override
+      public void start() throws Exception {
+
+      }
+
+      @Override
+      public void stop() throws Exception {
+
+      }
+
+      @Override
+      public boolean isStarted() {
+         return false;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/SendReceiveMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/SendReceiveMultiThreadTest.java
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/SendReceiveMultiThreadTest.java
new file mode 100644
index 0000000..c55ac22
--- /dev/null
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/SendReceiveMultiThreadTest.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.performance.storage;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SendReceiveMultiThreadTest extends ActiveMQTestBase {
+
+   final String DIRECTORY = "./target/journaltmp";
+
+   ConnectionFactory cf;
+
+   Destination destination;
+
+   AtomicInteger received = new AtomicInteger(0);
+
+   AtomicInteger sent = new AtomicInteger(0);
+
+   int NUMBER_OF_THREADS = 400;
+   int NUMBER_OF_MESSAGES = 5000;
+
+   CountDownLatch receivedLatch = new CountDownLatch(NUMBER_OF_MESSAGES * NUMBER_OF_THREADS);
+
+   @Test
+   public void testMultipleWrites() throws Exception {
+      deleteDirectory(new File(DIRECTORY));
+      ActiveMQServer server = createServer(true);
+      server.getConfiguration().setJournalFileSize(10 * 1024 * 1024);
+      server.getConfiguration().setJournalMinFiles(2);
+      server.getConfiguration().setJournalCompactMinFiles(ActiveMQDefaultConfiguration.getDefaultJournalCompactMinFiles());
+      server.getConfiguration().setJournalCompactPercentage(ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage());
+      server.getConfiguration().setJournalType(JournalType.ASYNCIO);
+      server.getConfiguration().addAcceptorConfiguration("core", DefaultConnectionProperties.DEFAULT_BROKER_BIND_URL);
+      server.getConfiguration().setJournalDirectory(DIRECTORY + "/journal");
+      server.getConfiguration().setBindingsDirectory(DIRECTORY + "/bindings");
+      server.getConfiguration().setPagingDirectory(DIRECTORY + "/paging");
+      server.getConfiguration().setLargeMessagesDirectory(DIRECTORY + "/largemessage");
+      server.getConfiguration().setJournalMaxIO_AIO(200);
+
+      // TODO Setup Acceptors
+
+      server.start();
+
+      Queue queue = server.createQueue(SimpleString.toSimpleString("jms.queue.performanceQueue"),
SimpleString.toSimpleString("jms.queue.performanceQueue"), null, true, false);
+
+      Queue queue2 = server.createQueue(SimpleString.toSimpleString("jms.queue.stationaryQueue"),
SimpleString.toSimpleString("jms.queue.stationaryQueue"), null, true, false);
+
+      MyThread[] threads = new MyThread[NUMBER_OF_THREADS];
+
+      ConsumerThread[] cthreads = new ConsumerThread[NUMBER_OF_THREADS];
+
+      final CountDownLatch alignFlag = new CountDownLatch(NUMBER_OF_THREADS);
+      final CountDownLatch startFlag = new CountDownLatch(1);
+      final CountDownLatch finishFlag = new CountDownLatch(NUMBER_OF_THREADS);
+
+      cf = new ActiveMQConnectionFactory();
+
+      Thread slowSending = new Thread() {
+         public void run() {
+            Connection conn = null;
+            try {
+               conn = cf.createConnection();
+               Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+               MessageProducer producer = session.createProducer(ActiveMQJMSClient.createQueue("stationaryQueue"));
+
+               conn.start();
+               MessageConsumer consumer = session.createConsumer(ActiveMQJMSClient.createQueue("stationaryQueue"));
+
+               while (true) {
+                  for (int i = 0; i < 10; i++) {
+                     System.out.println("stationed message");
+                     producer.send(session.createTextMessage("stationed"));
+                     session.commit();
+
+                     Thread.sleep(1000);
+                  }
+
+                  for (int i = 0; i < 10; i++) {
+                     consumer.receive(5000);
+                     session.commit();
+                     System.out.println("Receiving stationed");
+                     Thread.sleep(1000);
+                  }
+               }
+            }
+            catch (Exception e) {
+               e.printStackTrace();
+            }
+            finally {
+               try {
+                  conn.close();
+               }
+               catch (Exception ignored) {
+
+               }
+            }
+
+         }
+      };
+
+      slowSending.start();
+
+      destination = ActiveMQJMSClient.createQueue("performanceQueue");
+
+      for (int i = 0; i < threads.length; i++) {
+         threads[i] = new MyThread("sender::" + i, NUMBER_OF_MESSAGES, alignFlag, startFlag,
finishFlag);
+         cthreads[i] = new ConsumerThread(NUMBER_OF_MESSAGES);
+      }
+
+      for (ConsumerThread t : cthreads) {
+         t.start();
+      }
+
+      for (MyThread t : threads) {
+         t.start();
+      }
+
+      Assert.assertEquals(NUMBER_OF_THREADS, queue.getConsumerCount());
+
+      alignFlag.await();
+
+      long startTime = System.currentTimeMillis();
+      startFlag.countDown();
+
+      // I'm using a countDown to avoid measuring time spent on thread context from join.
+      // i.e. i want to measure as soon as the loops are done
+      finishFlag.await();
+      long endtime = System.currentTimeMillis();
+
+      receivedLatch.await();
+      long endTimeConsuming = System.currentTimeMillis();
+
+      for (ConsumerThread t : cthreads) {
+         t.join();
+         Assert.assertEquals(0, t.errors);
+      }
+
+      for (MyThread t : threads) {
+         t.join();
+         Assert.assertEquals(0, t.errors.get());
+      }
+
+      slowSending.interrupt();
+      slowSending.join();
+
+      server.stop();
+
+      System.out.println("Time on sending:: " + (endtime - startTime));
+      System.out.println("Time on consuming:: " + (endTimeConsuming - startTime));
+   }
+
+   class ConsumerThread extends Thread {
+
+      final int numberOfMessages;
+
+      Connection connection;
+      Session session;
+
+      MessageConsumer consumer;
+
+      ConsumerThread(int numberOfMessages) throws Exception {
+         super("consumerthread");
+         this.numberOfMessages = numberOfMessages;
+
+         connection = cf.createConnection();
+         session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         consumer = session.createConsumer(destination);
+         connection.start();
+      }
+
+      int errors = 0;
+
+      public void run() {
+         try {
+
+            for (int i = 0; i < numberOfMessages; i++) {
+               Message message = consumer.receive(50000);
+               if (message == null) {
+                  System.err.println("Could not receive message at i = " + numberOfMessages);
+                  errors++;
+                  break;
+               }
+
+               int r = received.incrementAndGet();
+
+               if (r % 1000 == 0) {
+                  System.out.println("Received " + r + " messages");
+               }
+
+               if (i % 50 == 0) {
+                  session.commit();
+               }
+
+               receivedLatch.countDown();
+            }
+            session.commit();
+            connection.close();
+         }
+         catch (Exception e) {
+            e.printStackTrace();
+            errors++;
+         }
+
+      }
+   }
+
+   class MyThread extends Thread {
+
+      final int numberOfMessages;
+      final AtomicInteger errors = new AtomicInteger(0);
+
+      final CountDownLatch align;
+      final CountDownLatch start;
+      final CountDownLatch finish;
+
+      MyThread(String name, int numberOfMessages, CountDownLatch align, CountDownLatch start,
CountDownLatch finish) {
+         super(name);
+         this.numberOfMessages = numberOfMessages;
+         this.align = align;
+         this.start = start;
+         this.finish = finish;
+      }
+
+      public void run() {
+         try {
+
+            Connection connection = cf.createConnection();
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+            MessageProducer producer = session.createProducer(destination);
+
+            align.countDown();
+            start.await();
+
+            for (int i = 0; i < numberOfMessages; i++) {
+               BytesMessage msg = session.createBytesMessage();
+               msg.writeBytes(new byte[1024]);
+               producer.send(msg);
+               session.commit();
+
+               int s = sent.incrementAndGet();
+               if (s % 1000 == 0) {
+                  System.out.println("Sent " + s);
+               }
+            }
+
+            connection.close();
+            System.out.println("Send " + numberOfMessages + " messages on thread " + Thread.currentThread().getName());
+         }
+         catch (Exception e) {
+            e.printStackTrace();
+            errors.incrementAndGet();
+         }
+         finally {
+            finish.countDown();
+         }
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/AddAndRemoveStressTest.java
----------------------------------------------------------------------
diff --git a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/AddAndRemoveStressTest.java
b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/AddAndRemoveStressTest.java
index 5c7ec19..eb49a99 100644
--- a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/AddAndRemoveStressTest.java
+++ b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/AddAndRemoveStressTest.java
@@ -75,7 +75,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase {
    public void testInsertAndLoad() throws Exception {
 
       SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDirfile(), 1000);
-      JournalImpl impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL,
0, 0, factory, "amq", "amq", 1000);
+      JournalImpl impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL,
AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
 
       impl.start();
 
@@ -91,7 +91,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase {
       impl.stop();
 
       factory = new AIOSequentialFileFactory(getTestDirfile(), 1000);
-      impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL,
0, 0, factory, "amq", "amq", 1000);
+      impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL,
AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
 
       impl.start();
 
@@ -108,7 +108,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase {
       impl.stop();
 
       factory = new AIOSequentialFileFactory(getTestDirfile(), 1000);
-      impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL,
0, 0, factory, "amq", "amq", 1000);
+      impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL,
AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
 
       impl.start();
 
@@ -136,7 +136,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase {
    public void testInsertUpdateAndLoad() throws Exception {
 
       SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDirfile(), 1000);
-      JournalImpl impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL,
0, 0, factory, "amq", "amq", 1000);
+      JournalImpl impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL,
AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
 
       impl.start();
 
@@ -153,7 +153,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase {
       impl.stop();
 
       factory = new AIOSequentialFileFactory(getTestDirfile(), 1000);
-      impl = new JournalImpl(10 * 1024 * 1024, 10, 0, 0, factory, "amq", "amq", 1000);
+      impl = new JournalImpl(10 * 1024 * 1024, 10, 10, 0, 0, factory, "amq", "amq", 1000);
 
       impl.start();
 
@@ -170,7 +170,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase {
       impl.stop();
 
       factory = new AIOSequentialFileFactory(getTestDirfile(), 1000);
-      impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL,
0, 0, factory, "amq", "amq", 1000);
+      impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL,
AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
 
       impl.start();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java
----------------------------------------------------------------------
diff --git a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java
b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java
index 75600a1..4d9eedc 100644
--- a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java
+++ b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java
@@ -115,7 +115,7 @@ public class JournalCleanupCompactStressTest extends ActiveMQTestBase
{
          maxAIO = ActiveMQDefaultConfiguration.getDefaultJournalMaxIoNio();
       }
 
-      journal = new JournalImpl(50 * 1024, 20, 50, ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage(),
factory, "activemq-data", "amq", maxAIO) {
+      journal = new JournalImpl(50 * 1024, 20, 20, 50, ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage(),
factory, "activemq-data", "amq", maxAIO) {
          @Override
          protected void onCompactLockingTheJournal() throws Exception {
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/MixupCompactorTestBase.java
----------------------------------------------------------------------
diff --git a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/MixupCompactorTestBase.java
b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/MixupCompactorTestBase.java
index a6133b2..93e631b 100644
--- a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/MixupCompactorTestBase.java
+++ b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/MixupCompactorTestBase.java
@@ -98,7 +98,7 @@ public abstract class MixupCompactorTestBase extends JournalImplTestBase
{
 
    @Override
    public void createJournal() throws Exception {
-      journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension,
maxAIO) {
+      journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, fileFactory, filePrefix,
fileExtension, maxAIO) {
 
          @Override
          public void onCompactDone() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
----------------------------------------------------------------------
diff --git a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
index bdb68d5..b555302 100644
--- a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
+++ b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
@@ -84,7 +84,7 @@ public class NIOMultiThreadCompactorStressTest extends ActiveMQTestBase
{
          stopServer();
 
          NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getJournalDir()),
1);
-         JournalImpl journal = new JournalImpl(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(),
2, 0, 0, factory, "activemq-data", "amq", 100);
+         JournalImpl journal = new JournalImpl(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(),
2, 2, 0, 0, factory, "activemq-data", "amq", 100);
 
          List<RecordInfo> committedRecords = new ArrayList<>();
          List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
index e01263b..1872665 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
@@ -141,7 +141,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
       factory = new FakeSequentialFileFactory(512, true);
 
       try {
-         journalImpl = new JournalImpl(2000, 2, 0, 0, factory, "tt", "tt", 1000);
+         journalImpl = new JournalImpl(2000, 2, 2, 0, 0, factory, "tt", "tt", 1000);
          Assert.fail("Expected IllegalArgumentException");
       }
       catch (IllegalArgumentException ignored) {
@@ -1201,7 +1201,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
    public void testAlignmentOverReload() throws Exception {
 
       factory = new FakeSequentialFileFactory(512, false);
-      journalImpl = new JournalImpl(512 + 512 * 3, 20, 0, 0, factory, "amq", "amq", 1000);
+      journalImpl = new JournalImpl(512 + 512 * 3, 20, 20, 0, 0, factory, "amq", "amq", 1000);
 
       journalImpl.start();
 
@@ -1214,7 +1214,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
 
       journalImpl.stop();
 
-      journalImpl = new JournalImpl(512 + 1024 + 512, 20, 0, 0, factory, "amq", "amq", 1000);
+      journalImpl = new JournalImpl(512 + 1024 + 512, 20, 20, 0, 0, factory, "amq", "amq",
1000);
       addActiveMQComponent(journalImpl);
       journalImpl.start();
       journalImpl.load(AlignedJournalImplTest.dummyLoader);
@@ -1230,7 +1230,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
 
       journalImpl.stop();
 
-      journalImpl = new JournalImpl(512 + 1024 + 512, 20, 0, 0, factory, "amq", "amq", 1000);
+      journalImpl = new JournalImpl(512 + 1024 + 512, 20, 20, 0, 0, factory, "amq", "amq",
1000);
       addActiveMQComponent(journalImpl);
       journalImpl.start();
 
@@ -1301,7 +1301,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
          journalImpl.stop();
       }
 
-      journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, 0, 0, factory, "tt",
"tt", 1000);
+      journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, numberOfMinimalFiles,
0, 0, factory, "tt", "tt", 1000);
       addActiveMQComponent(journalImpl);
       journalImpl.start();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
index f8a907a..c3b37e1 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
@@ -209,7 +209,7 @@ public class JournalAsyncTest extends ActiveMQTestBase {
          journalImpl.stop();
       }
 
-      journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, 0, 0, factory, "tt",
"tt", 1000);
+      journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, numberOfMinimalFiles,
0, 0, factory, "tt", "tt", 1000);
 
       journalImpl.start();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
index e502ff7..6c0564a 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
@@ -54,6 +54,8 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
 
    protected int minFiles;
 
+   protected int poolSize;
+
    protected int fileSize;
 
    protected boolean sync;
@@ -122,7 +124,16 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
    // ---------------------------------------------------------------------------------
 
    protected void setup(final int minFreeFiles, final int fileSize, final boolean sync, final
int maxAIO) {
+      this.minFiles = minFreeFiles;
+      this.poolSize = minFreeFiles;
+      this.fileSize = fileSize;
+      this.sync = sync;
+      this.maxAIO = maxAIO;
+   }
+
+   protected void setup(final int minFreeFiles, final int poolSize, final int fileSize, final
boolean sync, final int maxAIO) {
       minFiles = minFreeFiles;
+      this.poolSize = poolSize;
       this.fileSize = fileSize;
       this.sync = sync;
       this.maxAIO = maxAIO;
@@ -130,13 +141,14 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
 
    protected void setup(final int minFreeFiles, final int fileSize, final boolean sync) {
       minFiles = minFreeFiles;
+      poolSize = minFreeFiles;
       this.fileSize = fileSize;
       this.sync = sync;
       maxAIO = 50;
    }
 
    public void createJournal() throws Exception {
-      journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension,
maxAIO) {
+      journal = new JournalImpl(fileSize, minFiles, poolSize, 0, 0, fileFactory, filePrefix,
fileExtension, maxAIO) {
          @Override
          public void onCompactDone() {
             latchDone.countDown();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
index 02f1e9e..e714040 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
@@ -121,7 +121,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase
{
    @Test
    public void testParams() throws Exception {
       try {
-         new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, 0, 0, fileFactory, filePrefix,
fileExtension, 1);
+         new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, 10, 0, 0, fileFactory, filePrefix,
fileExtension, 1);
 
          Assert.fail("Should throw exception");
       }
@@ -130,7 +130,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase
{
       }
 
       try {
-         new JournalImpl(10 * 1024, 1, 0, 0, fileFactory, filePrefix, fileExtension, 1);
+         new JournalImpl(10 * 1024, 1, 0, 0, 0, fileFactory, filePrefix, fileExtension, 1);
 
          Assert.fail("Should throw exception");
       }
@@ -139,7 +139,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase
{
       }
 
       try {
-         new JournalImpl(10 * 1024, 10, 0, 0, null, filePrefix, fileExtension, 1);
+         new JournalImpl(10 * 1024, 10, 0, 0, 0, null, filePrefix, fileExtension, 1);
 
          Assert.fail("Should throw exception");
       }
@@ -148,7 +148,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase
{
       }
 
       try {
-         new JournalImpl(10 * 1024, 10, 0, 0, fileFactory, null, fileExtension, 1);
+         new JournalImpl(10 * 1024, 10, 0, 0, 0, fileFactory, null, fileExtension, 1);
 
          Assert.fail("Should throw exception");
       }
@@ -157,7 +157,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase
{
       }
 
       try {
-         new JournalImpl(10 * 1024, 10, 0, 0, fileFactory, filePrefix, null, 1);
+         new JournalImpl(10 * 1024, 10, 0, 0, 0, fileFactory, filePrefix, null, 1);
 
          Assert.fail("Should throw exception");
       }
@@ -166,7 +166,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase
{
       }
 
       try {
-         new JournalImpl(10 * 1024, 10, 0, 0, fileFactory, filePrefix, null, 0);
+         new JournalImpl(10 * 1024, 10, 0, 0, 0, fileFactory, filePrefix, null, 0);
 
          Assert.fail("Should throw exception");
       }
@@ -567,6 +567,103 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase
{
       stopJournal();
    }
 
+
+   @Test
+   public void testOrganicallyGrowNoLimit() throws Exception {
+      setup(2, -1, 10 * 1024, true, 50);
+      createJournal();
+      journal.setAutoReclaim(true);
+      startJournal();
+      load();
+
+      List<String> files1 = fileFactory.listFiles(fileExtension);
+
+      Assert.assertEquals(2, files1.size());
+
+      Assert.assertEquals(0, journal.getDataFilesCount());
+      Assert.assertEquals(0, journal.getFreeFilesCount());
+      Assert.assertEquals(1, journal.getOpenedFilesCount());
+      Assert.assertEquals(0, journal.getIDMapSize());
+
+      // Fill all the files
+
+      for (int i = 0; i < 200; i++) {
+         add(i);
+         journal.forceMoveNextFile();
+      }
+
+
+      for (int i = 0; i < 200; i++) {
+         delete(i);
+      }
+      journal.forceMoveNextFile();
+
+      journal.checkReclaimStatus();
+
+
+
+      files1 = fileFactory.listFiles(fileExtension);
+      Assert.assertTrue(files1.size() > 200);
+
+      int numberOfFiles = files1.size();
+
+      for (int i = 300; i < 350; i++) {
+         add(i);
+         journal.forceMoveNextFile();
+      }
+      journal.checkReclaimStatus();
+
+
+      files1 = fileFactory.listFiles(fileExtension);
+      Assert.assertTrue(files1.size() > 200);
+
+      Assert.assertEquals(numberOfFiles, files1.size());
+
+      System.out.println("we have " + files1.size() + " files now");
+
+      stopJournal();
+   }
+
+   @Test
+   public void testOrganicallyWithALimit() throws Exception {
+      setup(2, 5, 10 * 1024, true, 50);
+      createJournal();
+      journal.setAutoReclaim(true);
+      startJournal();
+      load();
+
+      List<String> files1 = fileFactory.listFiles(fileExtension);
+
+      Assert.assertEquals(2, files1.size());
+
+      Assert.assertEquals(0, journal.getDataFilesCount());
+      Assert.assertEquals(0, journal.getFreeFilesCount());
+      Assert.assertEquals(1, journal.getOpenedFilesCount());
+      Assert.assertEquals(0, journal.getIDMapSize());
+
+      // Fill all the files
+
+      for (int i = 0; i < 200; i++) {
+         add(i);
+         journal.forceMoveNextFile();
+      }
+
+      journal.checkReclaimStatus();
+
+
+      for (int i = 0; i < 200; i++) {
+         delete(i);
+      }
+      journal.forceMoveNextFile();
+
+      journal.checkReclaimStatus();
+
+      files1 = fileFactory.listFiles(fileExtension);
+      Assert.assertTrue("supposed to have less than 10 but it had " + files1.size() + " files
created", files1.size() < 10);
+
+      stopJournal();
+   }
+
    // Validate the methods that are used on assertions
    @Test
    public void testCalculations() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/351bcfc9/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
index 554a5e3..88cbcaa 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
@@ -39,7 +39,7 @@ public class BatchIDGeneratorUnitTest extends ActiveMQTestBase {
    @Test
    public void testSequence() throws Exception {
       NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getTestDir()),
1);
-      Journal journal = new JournalImpl(10 * 1024, 2, 0, 0, factory, "activemq-bindings",
"bindings", 1);
+      Journal journal = new JournalImpl(10 * 1024, 2, 2, 0, 0, factory, "activemq-bindings",
"bindings", 1);
 
       journal.start();
 


Mime
View raw message