activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [7/8] activemq-artemis git commit: ARTEMIS-822 Injecting IO Pools into and from ArtemisServerImpl
Date Mon, 31 Oct 2016 20:01:17 GMT
 ARTEMIS-822 Injecting IO Pools into and from ArtemisServerImpl

 https://issues.apache.org/jira/browse/ARTEMIS-822


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7eadff76
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7eadff76
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7eadff76

Branch: refs/heads/master
Commit: 7eadff76818546aa6045be2eeb2e6aef60992394
Parents: 6afde8f
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Fri Oct 28 11:11:59 2016 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Oct 31 11:34:27 2016 -0400

----------------------------------------------------------------------
 .../cli/commands/tools/XmlDataExporter.java     | 14 +++---
 .../journal/JMSJournalStorageManagerImpl.java   |  6 ++-
 .../jms/server/impl/JMSServerManagerImpl.java   | 15 +++---
 .../artemis/core/journal/impl/JournalImpl.java  | 49 +++++++++++---------
 .../journal/AbstractJournalStorageManager.java  | 10 +++-
 .../impl/journal/JDBCJournalStorageManager.java |  6 ++-
 .../impl/journal/JournalStorageManager.java     | 20 ++++----
 .../artemis/core/server/ActiveMQServer.java     |  2 +
 .../core/server/impl/ActiveMQServerImpl.java    | 12 +++--
 .../journal/NIOJournalCompactTest.java          | 19 ++++++--
 .../DeleteMessagesOnStartupTest.java            |  2 +-
 .../integration/persistence/RestartSMTest.java  |  2 +-
 .../persistence/StorageManagerTestBase.java     |  6 +--
 .../replication/ReplicationTest.java            |  2 +-
 .../server/SuppliedThreadPoolTest.java          |  2 +
 .../journal/impl/AlignedJournalImplTest.java    |  2 -
 .../core/journal/impl/JournalImplTestUnit.java  |  6 ---
 .../impl/DuplicateDetectionUnitTest.java        | 10 ++--
 18 files changed, 108 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
index a0e6c1e..8030ce2 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
@@ -90,6 +90,7 @@ import org.apache.activemq.artemis.jms.persistence.impl.journal.JMSJournalStorag
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
 
 @Command(name = "exp", description = "Export all message-data using an XML that could be
interpreted by any system.")
 public final class XmlDataExporter extends OptionalLocking {
@@ -142,15 +143,10 @@ public final class XmlDataExporter extends OptionalLocking {
                        String pagingDir,
                        String largeMessagesDir) throws Exception {
       config = new ConfigurationImpl().setBindingsDirectory(bindingsDir).setJournalDirectory(journalDir).setPagingDirectory(pagingDir).setLargeMessagesDirectory(largeMessagesDir).setJournalType(JournalType.NIO);
-      final ExecutorService executor = Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
-      ExecutorFactory executorFactory = new ExecutorFactory() {
-         @Override
-         public Executor getExecutor() {
-            return executor;
-         }
-      };
+      final ExecutorService executor = Executors.newFixedThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory());
+      ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
 
-      storageManager = new JournalStorageManager(config, executorFactory);
+      storageManager = new JournalStorageManager(config, executorFactory, executorFactory);
 
       XMLOutputFactory factory = XMLOutputFactory.newInstance();
       XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out, "UTF-8");
@@ -158,6 +154,8 @@ public final class XmlDataExporter extends OptionalLocking {
       xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(),
new Class[]{XMLStreamWriter.class}, handler);
 
       writeXMLData();
+
+      executor.shutdown();
    }
 
    private void writeXMLData() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java
b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java
index 32c438d..0aaa1a6 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java
@@ -40,6 +40,7 @@ import org.apache.activemq.artemis.jms.persistence.config.PersistedBindings;
 import org.apache.activemq.artemis.jms.persistence.config.PersistedConnectionFactory;
 import org.apache.activemq.artemis.jms.persistence.config.PersistedDestination;
 import org.apache.activemq.artemis.jms.persistence.config.PersistedType;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.IDGenerator;
 
 public final class JMSJournalStorageManagerImpl implements JMSStorageManager {
@@ -73,7 +74,8 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager
{
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
-   public JMSJournalStorageManagerImpl(final IDGenerator idGenerator,
+   public JMSJournalStorageManagerImpl(ExecutorFactory ioExecutors,
+                                       final IDGenerator idGenerator,
                                        final Configuration config,
                                        final ReplicationManager replicator) {
       if (config.getJournalType() != JournalType.NIO && config.getJournalType() !=
JournalType.ASYNCIO) {
@@ -86,7 +88,7 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager
{
 
       SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(),
1);
 
-      Journal localJMS = new JournalImpl(1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(),
config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1);
+      Journal localJMS = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalPoolFiles(),
config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms",
"jms", 1, 0);
 
       if (replicator != null) {
          jmsJournal = new ReplicatedJournal((byte) 2, localJMS, replicator);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
index dfa9218..456bb58 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
@@ -1544,16 +1544,13 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
     * @throws Exception
     */
    private void createJournal() throws Exception {
-      if (storage == null) {
-         if (coreConfig.isPersistenceEnabled()) {
-            storage = new JMSJournalStorageManagerImpl(new TimeAndCounterIDGenerator(), server.getConfiguration(),
server.getReplicationManager());
-         } else {
-            storage = new NullJMSStorageManagerImpl();
-         }
+      if (storage != null) {
+         storage.stop();
+      }
+      if (coreConfig.isPersistenceEnabled()) {
+         storage = new JMSJournalStorageManagerImpl(server.getIOExecutorFactory(), new TimeAndCounterIDGenerator(),
server.getConfiguration(), server.getReplicationManager());
       } else {
-         if (storage.isStarted()) {
-            storage.stop();
-         }
+         storage = new NullJMSStorageManagerImpl();
       }
 
       storage.start();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 983bd7d..b1093ed 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -74,6 +75,7 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
 import org.apache.activemq.artemis.utils.SimpleFuture;
 import org.jboss.logging.Logger;
@@ -185,8 +187,8 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
 
    private ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>();
 
-   private final OrderedExecutorFactory providedIOThreadPool;
-   protected OrderedExecutorFactory ioExecutorFactory;
+   private final ExecutorFactory providedIOThreadPool;
+   protected ExecutorFactory ioExecutorFactory;
    private ThreadPoolExecutor threadPool;
 
    /**
@@ -234,7 +236,7 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
       this(null, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, fileFactory,
filePrefix, fileExtension, maxAIO, userVersion);
    }
 
-   public JournalImpl(final OrderedExecutorFactory ioExecutors,
+   public JournalImpl(final ExecutorFactory ioExecutors,
                       final int fileSize,
                       final int minFiles,
                       final int poolSize,
@@ -744,7 +746,7 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
                if (result != null) {
                   result.fail(e);
                }
-               ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+               logger.error("appendAddRecord::"  + e, e);
             } finally {
                pendingRecords.remove(id);
                journalLock.readLock().unlock();
@@ -801,7 +803,7 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
                if (result != null) {
                   result.fail(e);
                }
-               ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+               logger.error("appendUpdateRecord:" + e, e);
             } finally {
                journalLock.readLock().unlock();
             }
@@ -851,7 +853,7 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
                if (result != null) {
                   result.fail(e);
                }
-               ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+               logger.error("appendDeleteRecord:" + e, e);
             } finally {
                journalLock.readLock().unlock();
             }
@@ -899,7 +901,7 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
 
                tx.addPositive(usedFile, id, addRecord.getEncodeSize());
             } catch (Exception e) {
-               ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+               logger.error("appendAddRecordTransactional:" + e, e);
                setErrorCondition(tx, e);
             } finally {
                journalLock.readLock().unlock();
@@ -979,7 +981,7 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
 
                tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() );
             } catch ( Exception e ) {
-               ActiveMQJournalLogger.LOGGER.error( e.getMessage(), e );
+               logger.error("appendUpdateRecordTransactional:" +  e.getMessage(), e );
                setErrorCondition( tx, e );
             } finally {
                journalLock.readLock().unlock();
@@ -1016,7 +1018,7 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
 
                tx.addNegative(usedFile, id);
             } catch (Exception e) {
-               ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+               logger.error("appendDeleteRecordTransactional:" + e, e);
                setErrorCondition(tx, e);
             } finally {
                journalLock.readLock().unlock();
@@ -1069,7 +1071,7 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
                if (result != null) {
                   result.fail(e);
                }
-               ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+               logger.error("appendPrepareRecord:" + e, e);
                setErrorCondition(tx, e);
             } finally {
                journalLock.readLock().unlock();
@@ -1142,7 +1144,7 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
                if (result != null) {
                   result.fail(e);
                }
-               ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+               logger.error("appendCommitRecord:" + e, e);
                setErrorCondition(tx, e);
             } finally {
                journalLock.readLock().unlock();
@@ -1185,7 +1187,7 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
                if (result != null) {
                   result.fail(e);
                }
-               ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+               logger.error("appendRollbackRecord:" + e, e);
                setErrorCondition(tx, e);
             }  finally {
                journalLock.readLock().unlock();
@@ -2067,7 +2069,6 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
    public void flush() throws Exception {
       fileFactory.flush();
 
-
       flushExecutor(appendExecutor);
 
       flushExecutor(filesExecutor);
@@ -2081,16 +2082,21 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
          // Send something to the closingExecutor, just to make sure we went until its end
          final CountDownLatch latch = new CountDownLatch(1);
 
-         executor.execute(new Runnable() {
+         try {
+            executor.execute(new Runnable() {
 
-            @Override
-            public void run() {
-               latch.countDown();
-            }
+               @Override
+               public void run() {
+                  latch.countDown();
+               }
 
-         });
-         latch.await(10, TimeUnit.SECONDS);
+            });
+            latch.await(10, TimeUnit.SECONDS);
+         } catch (RejectedExecutionException ignored ) {
+            // this is fine
+         }
       }
+
    }
 
    @Override
@@ -2243,7 +2249,7 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
    @Override
    public synchronized void stop() throws Exception {
       if (state == JournalState.STOPPED) {
-         throw new IllegalStateException("Journal is already stopped");
+         return;
       }
 
       setJournalState(JournalState.STOPPED);
@@ -2905,6 +2911,7 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
       try {
          scheduleCompactAndBlock(60);
       } catch (Exception e) {
+         e.printStackTrace();
          throw new RuntimeException(e);
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 768be45..ecaa86e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -146,6 +146,8 @@ public abstract class AbstractJournalStorageManager implements StorageManager
{
 
    protected BatchingIDGenerator idGenerator;
 
+   protected final ExecutorFactory ioExecutors;
+
    protected final ScheduledExecutorService scheduledExecutorService;
 
    protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true);
@@ -186,18 +188,22 @@ public abstract class AbstractJournalStorageManager implements StorageManager
{
 
    public AbstractJournalStorageManager(final Configuration config,
                                         final ExecutorFactory executorFactory,
-                                        final ScheduledExecutorService scheduledExecutorService)
{
-      this(config, executorFactory, scheduledExecutorService, null);
+                                        final ScheduledExecutorService scheduledExecutorService,
+                                        final ExecutorFactory ioExecutors) {
+      this(config, executorFactory, scheduledExecutorService, ioExecutors, null);
    }
 
    public AbstractJournalStorageManager(Configuration config,
                                         ExecutorFactory executorFactory,
                                         ScheduledExecutorService scheduledExecutorService,
+                                        ExecutorFactory ioExecutors,
                                         IOCriticalErrorListener criticalErrorListener) {
       this.executorFactory = executorFactory;
 
       this.ioCriticalErrorListener = criticalErrorListener;
 
+      this.ioExecutors = ioExecutors;
+
       this.scheduledExecutorService = scheduledExecutorService;
 
       this.config = config;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
index d97f988..e4d401b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
@@ -36,15 +36,17 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
 
    public JDBCJournalStorageManager(Configuration config,
                                     ExecutorFactory executorFactory,
+                                    ExecutorFactory ioExecutorFactory,
                                     ScheduledExecutorService scheduledExecutorService) {
-      super(config, executorFactory, scheduledExecutorService);
+      super(config, executorFactory, scheduledExecutorService, ioExecutorFactory);
    }
 
    public JDBCJournalStorageManager(final Configuration config,
                                     final ScheduledExecutorService scheduledExecutorService,
                                     final ExecutorFactory executorFactory,
+                                    final ExecutorFactory ioExecutorFactory,
                                     final IOCriticalErrorListener criticalErrorListener)
{
-      super(config, executorFactory, scheduledExecutorService, criticalErrorListener);
+      super(config, executorFactory, scheduledExecutorService, ioExecutorFactory, criticalErrorListener);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 2d8411a..24650e1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -86,25 +86,28 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
 
    public JournalStorageManager(final Configuration config,
                                 final ExecutorFactory executorFactory,
-                                final ScheduledExecutorService scheduledExecutorService)
{
-      this(config, executorFactory, scheduledExecutorService, null);
+                                final ScheduledExecutorService scheduledExecutorService,
+                                final ExecutorFactory ioExecutors) {
+      this(config, executorFactory, scheduledExecutorService, ioExecutors, null);
    }
 
-   public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory)
{
-      this(config, executorFactory, null, null);
+   public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory,
final ExecutorFactory ioExecutors) {
+      this(config, executorFactory, null, ioExecutors, null);
    }
 
    public JournalStorageManager(final Configuration config,
                                 final ExecutorFactory executorFactory,
                                 final ScheduledExecutorService scheduledExecutorService,
+                                final ExecutorFactory ioExecutors,
                                 final IOCriticalErrorListener criticalErrorListener) {
-      super(config, executorFactory, scheduledExecutorService, criticalErrorListener);
+      super(config, executorFactory, scheduledExecutorService, ioExecutors, criticalErrorListener);
    }
 
    public JournalStorageManager(final Configuration config,
                                 final ExecutorFactory executorFactory,
+                                final ExecutorFactory ioExecutors,
                                 final IOCriticalErrorListener criticalErrorListener) {
-      super(config, executorFactory, null, criticalErrorListener);
+      super(config, executorFactory, null, ioExecutors, criticalErrorListener);
    }
 
    @Override
@@ -116,7 +119,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
 
       bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener,
config.getJournalMaxIO_NIO());
 
-      Journal localBindings = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(),
config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings",
"bindings", 1);
+      Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(),
config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings",
"bindings", 1, 0);
 
       bindingsJournal = localBindings;
       originalBindingsJournal = localBindings;
@@ -132,7 +135,8 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
          throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());
       }
 
-      Journal localMessage = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(),
config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(),
journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO()
: config.getJournalMaxIO_NIO());
+      Journal localMessage = new JournalImpl(ioExecutors, config.getJournalFileSize(), config.getJournalMinFiles(),
config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(),
journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO()
: config.getJournalMaxIO_NIO(), 0);
+
       messageJournal = localMessage;
       originalMessageJournal = localMessage;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 477f839..a43fec8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -345,6 +345,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
 
    ExecutorFactory getExecutorFactory();
 
+   ExecutorFactory getIOExecutorFactory();
+
    void setGroupingHandler(GroupingHandler groupingHandler);
 
    GroupingHandler getGroupingHandler();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 6288bdf..d2de964 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -232,8 +232,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    private volatile ExecutorFactory executorFactory;
 
-
    private volatile ExecutorService ioExecutorPool;
+
    /**
     * This is a thread pool for io tasks only.
     * We can't use the same global executor to avoid starvations.
@@ -1637,6 +1637,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
+   public ExecutorFactory getIOExecutorFactory() {
+      return ioExecutorFactory;
+   }
+
+   @Override
    public void setGroupingHandler(final GroupingHandler groupingHandler) {
       if (this.groupingHandler != null && managementService != null) {
          // Removing old groupNotification
@@ -1770,10 +1775,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    private StorageManager createStorageManager() {
       if (configuration.isPersistenceEnabled()) {
          if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType()
== StoreConfiguration.StoreType.DATABASE) {
-            return new JDBCJournalStorageManager(configuration, getScheduledPool(), executorFactory,
shutdownOnCriticalIO);
+            return new JDBCJournalStorageManager(configuration, getScheduledPool(), executorFactory,
ioExecutorFactory, shutdownOnCriticalIO);
          } else {
             // Default to File Based Storage Manager, (Legacy default configuration).
-            return new JournalStorageManager(configuration, executorFactory, scheduledPool,
shutdownOnCriticalIO);
+            return new JournalStorageManager(configuration, executorFactory, scheduledPool,
ioExecutorFactory, shutdownOnCriticalIO);
          }
       }
       return new NullStorageManager();
@@ -1847,6 +1852,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          });
 
          this.ioExecutorPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), tFactory);
+         this.ioExecutorFactory = new OrderedExecutorFactory(ioExecutorPool);
       }
 
        /* We check to see if a Scheduled Executor Service is provided in the InjectedObjectRegistry.
 If so we use this

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
index 519ffb5..42c48f3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
@@ -1623,11 +1623,15 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
 
       final ExecutorService executor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
 
+      final ExecutorService ioexecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
+
       OrderedExecutorFactory factory = new OrderedExecutorFactory(executor);
 
+      OrderedExecutorFactory iofactory = new OrderedExecutorFactory(ioexecutor);
+
       final ExecutorService deleteExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
 
-      final JournalStorageManager storage = new JournalStorageManager(config, factory);
+      final JournalStorageManager storage = new JournalStorageManager(config, factory, iofactory);
 
       storage.start();
 
@@ -1681,7 +1685,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
                                     for (long messageID : values) {
                                        storage.deleteMessage(messageID);
                                     }
-                                 } catch (Exception e) {
+                                 } catch (Throwable e) {
                                     e.printStackTrace();
                                     errors.incrementAndGet();
                                  }
@@ -1733,11 +1737,17 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
 
          deleteExecutor.shutdown();
 
-         assertTrue("delete executor terminated", deleteExecutor.awaitTermination(30, TimeUnit.SECONDS));
+         assertTrue("delete executor failted to terminate", deleteExecutor.awaitTermination(30,
TimeUnit.SECONDS));
+
+         storage.stop();
 
          executor.shutdown();
 
-         assertTrue("executor terminated", executor.awaitTermination(10, TimeUnit.SECONDS));
+         assertTrue("executor failed to terminate", executor.awaitTermination(30, TimeUnit.SECONDS));
+
+         ioexecutor.shutdown();
+
+         assertTrue("ioexecutor failed to terminate", ioexecutor.awaitTermination(30, TimeUnit.SECONDS));
 
       } catch (Throwable e) {
          e.printStackTrace();
@@ -1751,6 +1761,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
 
          executor.shutdownNow();
          deleteExecutor.shutdownNow();
+         ioexecutor.shutdownNow();
       }
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
index 9848c39..7d515d8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
@@ -91,7 +91,7 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase
{
 
    @Override
    protected JournalStorageManager createJournalStorageManager(Configuration configuration)
{
-      return new JournalStorageManager(configuration, execFactory) {
+      return new JournalStorageManager(configuration, execFactory, execFactory) {
          @Override
          public void deleteMessage(final long messageID) throws Exception {
             deletedMessage.add(messageID);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
index 5828baf..49d3a12 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
@@ -65,7 +65,7 @@ public class RestartSMTest extends ActiveMQTestBase {
 
       PostOffice postOffice = new FakePostOffice();
 
-      final JournalStorageManager journal = new JournalStorageManager(createDefaultInVMConfig(),
execFactory);
+      final JournalStorageManager journal = new JournalStorageManager(createDefaultInVMConfig(),
execFactory, execFactory);
 
       try {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
index 814bf0d..a104363 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
@@ -137,7 +137,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase
{
     * @param configuration
     */
    protected JournalStorageManager createJournalStorageManager(Configuration configuration)
{
-      JournalStorageManager jsm = new JournalStorageManager(configuration, execFactory);
+      JournalStorageManager jsm = new JournalStorageManager(configuration, execFactory, execFactory);
       addActiveMQComponent(jsm);
       return jsm;
    }
@@ -146,7 +146,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase
{
     * @param configuration
     */
    protected JDBCJournalStorageManager createJDBCJournalStorageManager(Configuration configuration)
{
-      JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory,
scheduledExecutorService);
+      JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory,
execFactory, scheduledExecutorService);
       addActiveMQComponent(jsm);
       return jsm;
    }
@@ -155,7 +155,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase
{
     * @throws Exception
     */
    protected void createJMSStorage() throws Exception {
-      jmsJournal = new JMSJournalStorageManagerImpl(new TimeAndCounterIDGenerator(), createDefaultInVMConfig(),
null);
+      jmsJournal = new JMSJournalStorageManagerImpl(null, new TimeAndCounterIDGenerator(),
createDefaultInVMConfig(), null);
       addActiveMQComponent(jmsJournal);
       jmsJournal.start();
       jmsJournal.load();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 7d2d514..1ae9527 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -435,7 +435,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
     * @throws Exception
     */
    private JournalStorageManager getStorage() throws Exception {
-      return new JournalStorageManager(createDefaultInVMConfig(), factory);
+      return new JournalStorageManager(createDefaultInVMConfig(), factory, factory);
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SuppliedThreadPoolTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SuppliedThreadPoolTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SuppliedThreadPoolTest.java
index 65cd6b9..1deb1bb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SuppliedThreadPoolTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/SuppliedThreadPoolTest.java
@@ -44,6 +44,7 @@ public class SuppliedThreadPoolTest extends ActiveMQTestBase {
    public void setup() throws Exception {
       serviceRegistry = new ServiceRegistryImpl();
       serviceRegistry.setExecutorService(Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory()));
+      serviceRegistry.setIOExecutorService(Executors.newFixedThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
       serviceRegistry.setScheduledExecutorService(Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory()));
       server = new ActiveMQServerImpl(null, null, null, null, serviceRegistry);
       server.start();
@@ -58,6 +59,7 @@ public class SuppliedThreadPoolTest extends ActiveMQTestBase {
       }
       serviceRegistry.getExecutorService().shutdown();
       serviceRegistry.getScheduledExecutorService().shutdown();
+      serviceRegistry.getIOExecutorService().shutdown();
       super.tearDown();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
index 2b24296..be6e5b3 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
@@ -943,8 +943,6 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
       journalImpl.checkReclaimStatus();
       journalImpl.flush();
 
-      Assert.assertEquals(2, factory.listFiles("tt").size());
-
    }
 
    @Test

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
index eb815ae..3be030d 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
@@ -62,12 +62,6 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
       } catch (IllegalStateException e) {
          // OK
       }
-      try {
-         stopJournal();
-         Assert.fail("Should throw exception");
-      } catch (IllegalStateException e) {
-         // OK
-      }
       startJournal();
       try {
          startJournal();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7eadff76/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
index fcd32c5..96fa35c 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
@@ -70,7 +70,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
    @Before
    public void setUp() throws Exception {
       super.setUp();
-      executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
+      executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory());
       factory = new OrderedExecutorFactory(executor);
    }
 
@@ -92,7 +92,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
 
          ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(),
ActiveMQThreadFactory.defaultThreadFactory());
 
-         journal = new JournalStorageManager(configuration, factory);
+         journal = new JournalStorageManager(configuration, factory, factory);
 
          journal.start();
          journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
@@ -112,7 +112,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
 
          journal.stop();
 
-         journal = new JournalStorageManager(configuration, factory);
+         journal = new JournalStorageManager(configuration, factory, factory);
          journal.start();
          journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
 
@@ -135,7 +135,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
 
          mapDups.clear();
 
-         journal = new JournalStorageManager(configuration, factory);
+         journal = new JournalStorageManager(configuration, factory, factory);
          journal.start();
          journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
 
@@ -146,6 +146,8 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
          values = mapDups.get(ADDRESS);
 
          Assert.assertEquals(10, values.size());
+
+         scheduledThreadPool.shutdown();
       } finally {
          if (journal != null) {
             try {


Mime
View raw message