activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/3] activemq-artemis git commit: ARTEMIS-727 Improving Thread usage on JDBC
Date Mon, 12 Sep 2016 22:51:03 GMT
ARTEMIS-727 Improving Thread usage on JDBC

https://issues.apache.org/jira/browse/ARTEMIS-727


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f8278ec9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f8278ec9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f8278ec9

Branch: refs/heads/master
Commit: f8278ec99c45f5ff58a11f6678bd2333caa3e01a
Parents: 1a9c29c
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Thu Sep 8 20:46:21 2016 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Sep 12 14:32:40 2016 -0400

----------------------------------------------------------------------
 artemis-commons/pom.xml                         |  18 ++
 .../core/server/ActiveMQScheduledComponent.java | 141 +++++++++
 .../artemis/utils/ThreadLeakCheckRule.java      | 288 +++++++++++++++++++
 artemis-jdbc-store/pom.xml                      |   7 +
 .../jdbc/store/journal/JDBCJournalImpl.java     |  33 ++-
 .../jdbc/store/journal/JDBCJournalSync.java     |  16 +-
 .../file/JDBCSequentialFileFactoryTest.java     |  15 +
 .../journal/JDBCJournalLoaderCallbackTest.java  |  16 ++
 .../core/journal/impl/SimpleWaitIOCallback.java |   3 +
 artemis-server/pom.xml                          |   9 +
 .../artemis/core/paging/impl/PageSyncTimer.java |  15 +-
 .../core/paging/impl/PagingStoreImpl.java       |   2 +-
 .../journal/AbstractJournalStorageManager.java  |  12 +-
 .../impl/journal/JDBCJournalStorageManager.java |  20 +-
 .../impl/journal/JournalStorageManager.java     |  28 +-
 .../core/server/ActiveMQScheduledComponent.java | 101 -------
 .../core/server/files/FileStoreMonitor.java     |   8 +-
 .../core/server/impl/ActiveMQServerImpl.java    |   8 +-
 .../core/server/reload/ReloadManagerImpl.java   |   6 +-
 .../artemis/core/reload/ReloadManagerTest.java  |   8 +-
 .../core/server/files/FileMoveManagerTest.java  |   2 +-
 .../core/server/files/FileStoreMonitorTest.java |   9 +-
 .../artemis/tests/util/ActiveMQTestBase.java    |  80 +-----
 .../artemis/tests/util/ThreadLeakCheckRule.java | 216 --------------
 tests/activemq5-unit-tests/pom.xml              |   7 +
 tests/extra-tests/pom.xml                       |   7 +
 tests/integration-tests/pom.xml                 |   7 +
 .../broadcast/JGroupsBroadcastTest.java         |   2 +-
 .../jdbc/store/journal/JDBCJournalTest.java     |  37 ++-
 .../journal/NIOJournalCompactTest.java          |   2 +-
 .../DeleteMessagesOnStartupTest.java            |   2 +-
 .../integration/persistence/RestartSMTest.java  |   2 +-
 .../persistence/StorageManagerTestBase.java     |  12 +-
 .../replication/ReplicationTest.java            |   2 +-
 tests/unit-tests/pom.xml                        |   7 +
 .../impl/DuplicateDetectionUnitTest.java        |   6 +-
 36 files changed, 703 insertions(+), 451 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-commons/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-commons/pom.xml b/artemis-commons/pom.xml
index 98566e4..da14fe8 100644
--- a/artemis-commons/pom.xml
+++ b/artemis-commons/pom.xml
@@ -67,4 +67,22 @@
       </dependency>
    </dependencies>
 
+   <build>
+      <plugins>
+         <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-jar-plugin</artifactId>
+            <executions>
+               <execution>
+                  <phase>test</phase>
+                  <goals>
+                     <goal>test-jar</goal>
+                  </goals>
+               </execution>
+            </executions>
+         </plugin>
+      </plugins>
+   </build>
+
+
 </project>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
new file mode 100644
index 0000000..4d503eb
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.logging.Logger;
+
+/** This is for components with a scheduled at a fixed rate. */
+public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, Runnable {
+
+   private static final Logger logger = Logger.getLogger(ActiveMQScheduledComponent.class);
+   private final ScheduledExecutorService scheduledExecutorService;
+   private long period;
+   private TimeUnit timeUnit;
+   private final Executor executor;
+   private ScheduledFuture future;
+   private final boolean onDemand;
+
+   private final AtomicInteger delayed = new AtomicInteger(0);
+
+   public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService,
+                                     Executor executor,
+                                     long checkPeriod,
+                                     TimeUnit timeUnit,
+                                     boolean onDemand) {
+      this.executor = executor;
+      this.scheduledExecutorService = scheduledExecutorService;
+      if (this.scheduledExecutorService == null) {
+         throw new NullPointerException("scheduled Executor is null");
+      }
+      this.period = checkPeriod;
+      this.timeUnit = timeUnit;
+      this.onDemand = onDemand;
+   }
+
+   @Override
+   public synchronized void start() {
+      if (future != null) {
+         return;
+      }
+      if (onDemand) {
+         return;
+      }
+      if (period >= 0) {
+         future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, period, period, timeUnit);
+      }
+      else {
+         logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period);
+      }
+   }
+
+   public void delay() {
+      int value = delayed.incrementAndGet();
+      if (value > 10) {
+         delayed.decrementAndGet();
+      }
+      else {
+         // We only schedule up to 10 periods upfront.
+         // this is to avoid a window where a current one would be running and a next one is coming.
+         // in theory just 2 would be enough. I'm using 10 as a precaution here.
+         scheduledExecutorService.schedule(runForScheduler, Math.min(period, period * value), timeUnit);
+      }
+   }
+
+   public long getPeriod() {
+      return period;
+   }
+
+   public synchronized ActiveMQScheduledComponent setPeriod(long period) {
+      this.period = period;
+      restartIfNeeded();
+      return this;
+   }
+
+   public TimeUnit getTimeUnit() {
+      return timeUnit;
+   }
+
+   public synchronized ActiveMQScheduledComponent setTimeUnit(TimeUnit timeUnit) {
+      this.timeUnit = timeUnit;
+      restartIfNeeded();
+      return this;
+   }
+
+   @Override
+   public synchronized void stop() {
+      if (future == null) {
+         return; // no big deal
+      }
+
+      future.cancel(false);
+      future = null;
+
+   }
+
+   public void run() {
+      delayed.decrementAndGet();
+   }
+
+   @Override
+   public synchronized boolean isStarted() {
+      return future != null;
+   }
+
+
+   // this will restart the schedulped component upon changes
+   private void restartIfNeeded() {
+      if (isStarted()) {
+         stop();
+         start();
+      }
+   }
+
+   final Runnable runForScheduler = new Runnable() {
+      @Override
+      public void run() {
+         executor.execute(ActiveMQScheduledComponent.this);
+      }
+   };
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java
new file mode 100644
index 0000000..b2c3bf6
--- /dev/null
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+
+/**
+ * This is useful to make sure you won't have leaking threads between tests
+ */
+public class ThreadLeakCheckRule extends ExternalResource {
+   private static Logger log = Logger.getLogger(ThreadLeakCheckRule.class);
+
+   private static Set<String> knownThreads = new HashSet<>();
+
+   boolean enabled = true;
+
+   private Map<Thread, StackTraceElement[]> previousThreads;
+
+   public void disable() {
+      enabled = false;
+   }
+
+   /**
+    * Override to set up your specific external resource.
+    *
+    * @throws if setup fails (which will disable {@code after}
+    */
+   @Override
+   protected void before() throws Throwable {
+      // do nothing
+
+      previousThreads = Thread.getAllStackTraces();
+
+   }
+
+   /**
+    * Override to tear down your specific external resource.
+    */
+   @Override
+   protected void after() {
+      try {
+         if (enabled) {
+            boolean failed = true;
+
+            boolean failedOnce = false;
+
+            long timeout = System.currentTimeMillis() + 60000;
+            while (failed && timeout > System.currentTimeMillis()) {
+               failed = checkThread();
+
+               if (failed) {
+                  failedOnce = true;
+                  forceGC();
+                  try {
+                     Thread.sleep(500);
+                  }
+                  catch (Throwable e) {
+                  }
+               }
+            }
+
+            if (failed) {
+               Assert.fail("Thread leaked");
+            }
+            else if (failedOnce) {
+               System.out.println("******************** Threads cleared after retries ********************");
+               System.out.println();
+            }
+
+         }
+         else {
+            enabled = true;
+         }
+      }
+      finally {
+         // clearing just to help GC
+         previousThreads = null;
+      }
+
+   }
+
+   private static int failedGCCalls = 0;
+
+   public static void forceGC() {
+
+      if (failedGCCalls >= 10) {
+         log.info("ignoring forceGC call since it seems System.gc is not working anyways");
+         return;
+      }
+      log.info("#test forceGC");
+      CountDownLatch finalized = new CountDownLatch(1);
+      WeakReference<DumbReference> dumbReference = new WeakReference<>(new DumbReference(finalized));
+
+      long timeout = System.currentTimeMillis() + 1000;
+
+      // A loop that will wait GC, using the minimal time as possible
+      while (!(dumbReference.get() == null && finalized.getCount() == 0) && System.currentTimeMillis() < timeout) {
+         System.gc();
+         System.runFinalization();
+         try {
+            finalized.await(100, TimeUnit.MILLISECONDS);
+         }
+         catch (InterruptedException e) {
+         }
+      }
+
+      if (dumbReference.get() != null) {
+         failedGCCalls++;
+         log.info("It seems that GC is disabled at your VM");
+      }
+      else {
+         // a success would reset the count
+         failedGCCalls = 0;
+      }
+      log.info("#test forceGC Done ");
+   }
+
+   public static void forceGC(final Reference<?> ref, final long timeout) {
+      long waitUntil = System.currentTimeMillis() + timeout;
+      // A loop that will wait GC, using the minimal time as possible
+      while (ref.get() != null && System.currentTimeMillis() < waitUntil) {
+         ArrayList<String> list = new ArrayList<>();
+         for (int i = 0; i < 1000; i++) {
+            list.add("Some string with garbage with concatenation " + i);
+         }
+         list.clear();
+         list = null;
+         System.gc();
+         try {
+            Thread.sleep(500);
+         }
+         catch (InterruptedException e) {
+         }
+      }
+   }
+
+   public static void removeKownThread(String name) {
+      knownThreads.remove(name);
+   }
+
+   public static void addKownThread(String name) {
+      knownThreads.add(name);
+   }
+
+   private boolean checkThread() {
+      boolean failedThread = false;
+
+      Map<Thread, StackTraceElement[]> postThreads = Thread.getAllStackTraces();
+
+      if (postThreads != null && previousThreads != null && postThreads.size() > previousThreads.size()) {
+
+
+         for (Thread aliveThread : postThreads.keySet()) {
+            if (aliveThread.isAlive() && !isExpectedThread(aliveThread) && !previousThreads.containsKey(aliveThread)) {
+               if (!failedThread) {
+                  System.out.println("*********************************************************************************");
+                  System.out.println("LEAKING THREADS");
+               }
+               failedThread = true;
+               System.out.println("=============================================================================");
+               System.out.println("Thread " + aliveThread + " is still alive with the following stackTrace:");
+               StackTraceElement[] elements = postThreads.get(aliveThread);
+               for (StackTraceElement el : elements) {
+                  System.out.println(el);
+               }
+            }
+
+         }
+         if (failedThread) {
+            System.out.println("*********************************************************************************");
+         }
+      }
+
+
+      return failedThread;
+   }
+
+
+   /**
+    * if it's an expected thread... we will just move along ignoring it
+    *
+    * @param thread
+    * @return
+    */
+   private boolean isExpectedThread(Thread thread) {
+      final String threadName = thread.getName();
+      final ThreadGroup group = thread.getThreadGroup();
+      final boolean isSystemThread = group != null && "system".equals(group.getName());
+      final String javaVendor = System.getProperty("java.vendor");
+
+      if (threadName.contains("SunPKCS11")) {
+         return true;
+      }
+      else if (threadName.contains("Attach Listener")) {
+         return true;
+      }
+      else if ((javaVendor.contains("IBM") || isSystemThread) && threadName.equals("process reaper")) {
+         return true;
+      }
+      else if ((javaVendor.contains("IBM") || isSystemThread) && threadName.equals("ClassCache Reaper")) {
+         return true;
+      }
+      else if (javaVendor.contains("IBM") && threadName.equals("MemoryPoolMXBean notification dispatcher")) {
+         return true;
+      }
+      else if (threadName.contains("globalEventExecutor")) {
+         return true;
+      }
+      else if (threadName.contains("threadDeathWatcher")) {
+         return true;
+      }
+      else if (threadName.contains("netty-threads")) {
+         // This is ok as we use EventLoopGroup.shutdownGracefully() which will shutdown things with a bit of delay
+         // if the EventLoop's are still busy.
+         return true;
+      }
+      else if (threadName.contains("threadDeathWatcher")) {
+         //another netty thread
+         return true;
+      }
+      else if (threadName.contains("Abandoned connection cleanup thread")) {
+         // MySQL Engine checks for abandoned connections
+         return true;
+      }
+      else if (threadName.contains("hawtdispatch")) {
+         // Static workers used by MQTT client.
+         return true;
+      }
+      else {
+         for (StackTraceElement element : thread.getStackTrace()) {
+            if (element.getClassName().contains("org.jboss.byteman.agent.TransformListener")) {
+               return true;
+            }
+         }
+
+         for (String known: knownThreads) {
+            if (threadName.contains(known)) {
+               return true;
+            }
+         }
+
+         return false;
+      }
+   }
+
+
+   protected static class DumbReference {
+
+      private CountDownLatch finalized;
+
+      public DumbReference(CountDownLatch finalized) {
+         this.finalized = finalized;
+      }
+
+      @Override
+      public void finalize() throws Throwable {
+         finalized.countDown();
+         super.finalize();
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-jdbc-store/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/pom.xml b/artemis-jdbc-store/pom.xml
index a888d4e..2e0c8c0 100644
--- a/artemis-jdbc-store/pom.xml
+++ b/artemis-jdbc-store/pom.xml
@@ -40,6 +40,13 @@
          <scope>provided</scope>
          <optional>true</optional>
       </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>artemis-commons</artifactId>
+         <version>${project.version}</version>
+         <scope>test</scope>
+         <type>test-jar</type>
+      </dependency>
 
       <dependency>
          <groupId>org.jboss.logging</groupId>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index fe5e69d..51f3a3e 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -23,8 +23,10 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@@ -66,11 +68,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
 
    private boolean started;
 
-   private Timer syncTimer;
+   private JDBCJournalSync syncTimer;
+
+   private final Executor completeExecutor;
 
    private final Object journalLock = new Object();
 
-   private final String timerThread;
+   private final ScheduledExecutorService scheduledExecutorService;
 
    // Track Tx Records
    private Map<Long, TransactionHolder> transactions = new ConcurrentHashMap<>();
@@ -78,17 +82,17 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    // Sequence ID for journal records
    private AtomicLong seq = new AtomicLong(0);
 
-   public JDBCJournalImpl(String jdbcUrl, String tableName, String jdbcDriverClass) {
+   public JDBCJournalImpl(String jdbcUrl, String tableName, String jdbcDriverClass, ScheduledExecutorService scheduledExecutorService, Executor completeExecutor) {
       super(tableName, jdbcUrl, jdbcDriverClass);
-      timerThread = "Timer JDBC Journal(" + tableName + ")";
       records = new ArrayList<>();
+      this.scheduledExecutorService = scheduledExecutorService;
+      this.completeExecutor = completeExecutor;
    }
 
    @Override
    public void start() throws Exception {
       super.start();
-      syncTimer = new Timer(timerThread, true);
-      syncTimer.schedule(new JDBCJournalSync(this), SYNC_DELAY * 2, SYNC_DELAY);
+      syncTimer = new JDBCJournalSync(scheduledExecutorService, completeExecutor, SYNC_DELAY, TimeUnit.MILLISECONDS, this);
       started = true;
    }
 
@@ -111,7 +115,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    public synchronized void stop() throws SQLException {
       if (started) {
          synchronized (journalLock) {
-            syncTimer.cancel();
             sync();
             started = false;
             super.stop();
@@ -129,9 +132,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
       if (!started)
          return 0;
 
-      List<JDBCJournalRecord> recordRef = new ArrayList<>();
+      List<JDBCJournalRecord> recordRef;
       synchronized (records) {
-         recordRef.addAll(records);
+         if (records.isEmpty()) {
+            return 0;
+         }
+         recordRef = new ArrayList<>(records);
          records.clear();
       }
 
@@ -271,14 +277,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
             }
          }
       };
-      Thread t = new Thread(r);
-      t.start();
+      completeExecutor.execute(r);
    }
 
    private void appendRecord(JDBCJournalRecord record) throws Exception {
 
       SimpleWaitIOCallback callback = null;
-      if (record.isSync() && record.getIoCompletion() == null) {
+      if (record.isSync() && record.getIoCompletion() == null && !record.isTransactional()) {
          callback = new SimpleWaitIOCallback();
          record.setIoCompletion(callback);
       }
@@ -293,6 +298,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
          }
       }
 
+      syncTimer.delay();
+
       if (callback != null)
          callback.waitCompletion();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java
index a224625..53f07b8 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java
@@ -17,18 +17,28 @@
 
 package org.apache.activemq.artemis.jdbc.store.journal;
 
-import java.util.TimerTask;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
-public class JDBCJournalSync extends TimerTask {
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+
+public class JDBCJournalSync extends ActiveMQScheduledComponent {
 
    private final JDBCJournalImpl journal;
 
-   public JDBCJournalSync(JDBCJournalImpl journal) {
+   public JDBCJournalSync(ScheduledExecutorService scheduledExecutorService,
+                          Executor executor,
+                          long checkPeriod,
+                          TimeUnit timeUnit,
+                          JDBCJournalImpl journal) {
+      super(scheduledExecutorService, executor, checkPeriod, timeUnit, true);
       this.journal = journal;
    }
 
    @Override
    public void run() {
+      super.run();
       if (journal.isStarted()) {
          journal.sync();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
index 2bdd729..8157e6f 100644
--- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
+++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.jdbc.file;
 
 import java.nio.ByteBuffer;
+import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.HashSet;
 import java.util.List;
@@ -34,9 +35,11 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
 import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
 import org.apache.derby.jdbc.EmbeddedDriver;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -46,6 +49,9 @@ import static org.junit.Assert.fail;
 
 public class JDBCSequentialFileFactoryTest {
 
+   @Rule
+   public ThreadLeakCheckRule leakCheckRule = new ThreadLeakCheckRule();
+
    private static String connectionUrl = "jdbc:derby:target/data;create=true";
 
    private static String tableName = "FILES";
@@ -67,6 +73,15 @@ public class JDBCSequentialFileFactoryTest {
       factory.destroy();
    }
 
+   @After
+   public void shutdownDerby() {
+      try {
+         DriverManager.getConnection("jdbc:derby:;shutdown=true");
+      }
+      catch (Exception ignored) {
+      }
+   }
+
    @Test
    public void testJDBCFileFactoryStarted() throws Exception {
       assertTrue(factory.isStarted());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallbackTest.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallbackTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallbackTest.java
index 9369866..c7e514b 100644
--- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallbackTest.java
+++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallbackTest.java
@@ -16,11 +16,15 @@
  */
 package org.apache.activemq.artemis.jdbc.store.journal;
 
+import java.sql.DriverManager;
 import java.util.ArrayList;
 
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
+import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
+import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -28,6 +32,8 @@ import static org.junit.Assert.assertTrue;
 
 public class JDBCJournalLoaderCallbackTest {
 
+   @Rule
+   public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();
    @Test
    public void testAddDeleteRecord() throws Exception {
 
@@ -46,4 +52,14 @@ public class JDBCJournalLoaderCallbackTest {
       cb.deleteRecord(record.id);
       assertTrue(committedRecords.isEmpty());
    }
+
+   @After
+   public void shutdownDerby() {
+      try {
+         DriverManager.getConnection("jdbc:derby:;shutdown=true");
+      }
+      catch (Exception ignored) {
+      }
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SimpleWaitIOCallback.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SimpleWaitIOCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SimpleWaitIOCallback.java
index 7f98ec5..8c41455 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SimpleWaitIOCallback.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SimpleWaitIOCallback.java
@@ -31,6 +31,9 @@ public final class SimpleWaitIOCallback extends SyncIOCompletion {
 
    private volatile int errorCode = 0;
 
+   public SimpleWaitIOCallback() {
+   }
+
    @Override
    public String toString() {
       return SimpleWaitIOCallback.class.getName();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml
index 034391d..7f9b9db 100644
--- a/artemis-server/pom.xml
+++ b/artemis-server/pom.xml
@@ -110,6 +110,15 @@
             </exclusion>
          </exclusions>
       </dependency>
+
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>artemis-commons</artifactId>
+         <version>${project.version}</version>
+         <scope>test</scope>
+         <type>test-jar</type>
+      </dependency>
+
    </dependencies>
 
    <profiles>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
index bf90750..37cffb5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
@@ -18,17 +18,19 @@ package org.apache.activemq.artemis.core.paging.impl;
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
 
 /**
  * This will batch multiple calls waiting to perform a sync in a single call.
  */
-final class PageSyncTimer {
+final class PageSyncTimer extends ActiveMQScheduledComponent {
 
    // Constants -----------------------------------------------------
 
@@ -55,7 +57,8 @@ final class PageSyncTimer {
 
    // Constructors --------------------------------------------------
 
-   PageSyncTimer(PagingStore store, ScheduledExecutorService scheduledExecutor, long timeSync) {
+   PageSyncTimer(PagingStore store, ScheduledExecutorService scheduledExecutor, Executor executor, long timeSync) {
+      super(scheduledExecutor, executor, timeSync, TimeUnit.NANOSECONDS, true);
       this.store = store;
       this.scheduledExecutor = scheduledExecutor;
       this.timeSync = timeSync;
@@ -68,12 +71,16 @@ final class PageSyncTimer {
       if (!pendingSync) {
          pendingSync = true;
 
-         // this is a single event
-         scheduledExecutor.schedule(runnable, timeSync, TimeUnit.NANOSECONDS);
+         delay();
       }
       syncOperations.add(ctx);
    }
 
+   public void run() {
+      super.run();
+      tick();
+   }
+
    private void tick() {
       OperationContext[] pendingSyncsArray;
       synchronized (this) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 356ea45..df603be 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -170,7 +170,7 @@ public class PagingStoreImpl implements PagingStore {
       this.syncNonTransactional = syncNonTransactional;
 
       if (scheduledExecutor != null && syncTimeout > 0) {
-         this.syncTimer = new PageSyncTimer(this, scheduledExecutor, syncTimeout);
+         this.syncTimer = new PageSyncTimer(this, scheduledExecutor, executor, syncTimeout);
       }
       else {
          this.syncTimer = null;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 8fac438..6b75e74 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -145,6 +146,8 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
 
    protected BatchingIDGenerator idGenerator;
 
+   protected final ScheduledExecutorService scheduledExecutorService;
+
    protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true);
 
    protected Journal messageJournal;
@@ -156,7 +159,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
    /**
     * Used to create Operation Contexts
     */
-   private final ExecutorFactory executorFactory;
+   protected final ExecutorFactory executorFactory;
 
    final Executor executor;
 
@@ -181,17 +184,20 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
 
    protected final Set<Long> largeMessagesToDelete = new HashSet<>();
 
-   public AbstractJournalStorageManager(final Configuration config, final ExecutorFactory executorFactory) {
-      this(config, executorFactory, null);
+   public AbstractJournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, final ScheduledExecutorService scheduledExecutorService) {
+      this(config, executorFactory, scheduledExecutorService, null);
    }
 
    public AbstractJournalStorageManager(Configuration config,
                                         ExecutorFactory executorFactory,
+                                        ScheduledExecutorService scheduledExecutorService,
                                         IOCriticalErrorListener criticalErrorListener) {
       this.executorFactory = executorFactory;
 
       this.ioCriticalErrorListener = criticalErrorListener;
 
+      this.scheduledExecutorService = scheduledExecutorService;
+
       this.config = config;
 
       executor = executorFactory.getExecutor();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
index 27fe5dc..70d824f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.core.config.Configuration;
@@ -32,14 +33,17 @@ import org.apache.activemq.artemis.utils.ExecutorFactory;
 
 public class JDBCJournalStorageManager extends JournalStorageManager {
 
-   public JDBCJournalStorageManager(Configuration config, ExecutorFactory executorFactory) {
-      super(config, executorFactory);
+   public JDBCJournalStorageManager(Configuration config,
+                                    ExecutorFactory executorFactory,
+                                    ScheduledExecutorService scheduledExecutorService) {
+      super(config, executorFactory, scheduledExecutorService);
    }
 
    public JDBCJournalStorageManager(final Configuration config,
-                                final ExecutorFactory executorFactory,
-                                final IOCriticalErrorListener criticalErrorListener) {
-      super(config, executorFactory, criticalErrorListener);
+                                    final ScheduledExecutorService scheduledExecutorService,
+                                    final ExecutorFactory executorFactory,
+                                    final IOCriticalErrorListener criticalErrorListener) {
+      super(config, executorFactory, scheduledExecutorService, criticalErrorListener);
    }
 
    @Override
@@ -47,16 +51,16 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
       try {
          DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration();
 
-         Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName(), dbConf.getJdbcDriverClassName());
+         Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName(), dbConf.getJdbcDriverClassName(), scheduledExecutorService, executorFactory.getExecutor());
          bindingsJournal = localBindings;
 
-         Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName(), dbConf.getJdbcDriverClassName());
+         Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName(), dbConf.getJdbcDriverClassName(), scheduledExecutorService, executorFactory.getExecutor());
          messageJournal = localMessage;
 
          bindingsJournal.start();
          messageJournal.start();
 
-         largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getLargeMessageTableName(), dbConf.getJdbcDriverClassName(), executor);
+         largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getLargeMessageTableName(), dbConf.getJdbcDriverClassName(), executorFactory.getExecutor());
          largeMessagesFactory.start();
       }
       catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index d8c28d2..2aefbef 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -81,14 +82,25 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
 
    private ReplicationManager replicator;
 
+   public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, final ScheduledExecutorService scheduledExecutorService) {
+      this(config, executorFactory, scheduledExecutorService, null);
+   }
+
    public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory) {
-      this(config, executorFactory, null);
+      this(config, executorFactory, null, null);
    }
 
    public JournalStorageManager(final Configuration config,
                                 final ExecutorFactory executorFactory,
+                                final ScheduledExecutorService scheduledExecutorService,
                                 final IOCriticalErrorListener criticalErrorListener) {
-      super(config, executorFactory, criticalErrorListener);
+      super(config, executorFactory, scheduledExecutorService, criticalErrorListener);
+   }
+
+   public JournalStorageManager(final Configuration config,
+                                final ExecutorFactory executorFactory,
+                                final IOCriticalErrorListener criticalErrorListener) {
+      super(config, executorFactory, null, criticalErrorListener);
    }
 
    @Override
@@ -732,8 +744,14 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
 
    @Override
    public void injectMonitor(FileStoreMonitor monitor) throws Exception {
-      monitor.addStore(journalFF.getDirectory());
-      monitor.addStore(largeMessagesFactory.getDirectory());
-      monitor.addStore(bindingsFF.getDirectory());
+      if (journalFF != null) {
+         monitor.addStore(journalFF.getDirectory());
+      }
+      if (largeMessagesFactory != null) {
+         monitor.addStore(largeMessagesFactory.getDirectory());
+      }
+      if (bindingsFF != null) {
+         monitor.addStore(bindingsFF.getDirectory());
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
deleted file mode 100644
index dadf171..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.core.server;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.jboss.logging.Logger;
-
-/** This is for components with a scheduled at a fixed rate. */
-public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, Runnable {
-
-   private static final Logger logger = Logger.getLogger(ActiveMQScheduledComponent.class);
-   private final ScheduledExecutorService scheduledExecutorService;
-   private long period;
-   private TimeUnit timeUnit;
-   private ScheduledFuture future;
-
-   public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService,
-                                     long checkPeriod,
-                                     TimeUnit timeUnit) {
-      this.scheduledExecutorService = scheduledExecutorService;
-      this.period = checkPeriod;
-      this.timeUnit = timeUnit;
-   }
-
-   @Override
-   public synchronized void start() {
-      if (future != null) {
-         return;
-      }
-      if (period >= 0) {
-         future = scheduledExecutorService.scheduleWithFixedDelay(this, period, period, timeUnit);
-      }
-      else {
-         logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period);
-      }
-   }
-
-   public long getPeriod() {
-      return period;
-   }
-
-   public synchronized ActiveMQScheduledComponent setPeriod(long period) {
-      this.period = period;
-      restartIfNeeded();
-      return this;
-   }
-
-   public TimeUnit getTimeUnit() {
-      return timeUnit;
-   }
-
-   public synchronized ActiveMQScheduledComponent setTimeUnit(TimeUnit timeUnit) {
-      this.timeUnit = timeUnit;
-      restartIfNeeded();
-      return this;
-   }
-
-   @Override
-   public synchronized void stop() {
-      if (future == null) {
-         return; // no big deal
-      }
-
-      future.cancel(false);
-      future = null;
-
-   }
-
-   @Override
-   public synchronized boolean isStarted() {
-      return future != null;
-   }
-
-
-   // this will restart the schedulped component upon changes
-   private void restartIfNeeded() {
-      if (isStarted()) {
-         stop();
-         start();
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
index f75f6c6..a5259bd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
@@ -23,6 +23,7 @@ import java.nio.file.FileStore;
 import java.nio.file.Files;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -44,10 +45,11 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
    private double maxUsage;
 
    public FileStoreMonitor(ScheduledExecutorService scheduledExecutorService,
+                           Executor executor,
                            long checkPeriod,
                            TimeUnit timeUnit,
                            double maxUsage) {
-      super(scheduledExecutorService, checkPeriod, timeUnit);
+      super(scheduledExecutorService, executor, checkPeriod, timeUnit, false);
       this.maxUsage = maxUsage;
    }
 
@@ -57,7 +59,8 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
    }
 
    public synchronized FileStoreMonitor addStore(File file) throws IOException {
-      if (file.exists()) {
+      // JDBC storage may return this as null, and we may need to ignore it
+      if (file != null && file.exists()) {
          addStore(Files.getFileStore(file.toPath()));
       }
       return this;
@@ -70,6 +73,7 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
 
 
    public void run() {
+      super.run();
       tick();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 680af8a..f5b9f26 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1796,11 +1796,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    private StorageManager createStorageManager() {
       if (configuration.isPersistenceEnabled()) {
          if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
-            return new JDBCJournalStorageManager(configuration, executorFactory, shutdownOnCriticalIO);
+            return new JDBCJournalStorageManager(configuration, getScheduledPool(), executorFactory, shutdownOnCriticalIO);
          }
          // Default to File Based Storage Manager, (Legacy default configuration).
          else {
-            return new JournalStorageManager(configuration, executorFactory, shutdownOnCriticalIO);
+            return new JournalStorageManager(configuration, executorFactory, scheduledPool, shutdownOnCriticalIO);
          }
       }
       return new NullStorageManager();
@@ -1974,7 +1974,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       deployGroupingHandlerConfiguration(configuration.getGroupingHandlerConfiguration());
 
-      this.reloadManager = new ReloadManagerImpl(getScheduledPool(), configuration.getConfigurationFileRefreshPeriod());
+      this.reloadManager = new ReloadManagerImpl(getScheduledPool(), executorFactory.getExecutor(), configuration.getConfigurationFileRefreshPeriod());
 
       if (configuration.getConfigurationUrl() != null && getScheduledPool() != null) {
          reloadManager.addCallback(configuration.getConfigurationUrl(), new ConfigurationFileReloader());
@@ -2055,7 +2055,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       }
 
       try {
-         injectMonitor(new FileStoreMonitor(getScheduledPool(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f));
+         injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f));
       }
       catch (Exception e) {
          logger.warn(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
index 7686ac5..43fe54c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -37,11 +38,12 @@ public class ReloadManagerImpl extends ActiveMQScheduledComponent implements Rel
 
    private Map<URL, ReloadRegistry> registry = new HashMap<>();
 
-   public ReloadManagerImpl(ScheduledExecutorService scheduledExecutorService, long checkPeriod) {
-      super(scheduledExecutorService, checkPeriod, TimeUnit.MILLISECONDS);
+   public ReloadManagerImpl(ScheduledExecutorService scheduledExecutorService, Executor executor, long checkPeriod) {
+      super(scheduledExecutorService, executor, checkPeriod, TimeUnit.MILLISECONDS, false);
    }
 
    public void run() {
+      super.run();
       tick();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/test/java/org/apache/activemq/artemis/core/reload/ReloadManagerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/reload/ReloadManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/reload/ReloadManagerTest.java
index 181604f..e75ebc8 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/reload/ReloadManagerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/reload/ReloadManagerTest.java
@@ -20,6 +20,8 @@ package org.apache.activemq.artemis.core.reload;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -37,18 +39,22 @@ public class ReloadManagerTest extends ActiveMQTestBase {
 
    private ScheduledExecutorService scheduledExecutorService;
 
+   private ExecutorService executorService;
+
    private ReloadManagerImpl manager;
 
    @Before
    public void startScheduled() {
       scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
-      manager = new ReloadManagerImpl(scheduledExecutorService, 100);
+      executorService = Executors.newSingleThreadExecutor();
+      manager = new ReloadManagerImpl(scheduledExecutorService, executorService, 100);
    }
 
    @After
    public void stopScheduled() {
       manager.stop();
       scheduledExecutorService.shutdown();
+      executorService.shutdown();
       scheduledExecutorService = null;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
index 299f0bc..b00efca 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
@@ -39,8 +39,8 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
-import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
 import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
+import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
index 9a47d05..7b5629f 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
@@ -23,6 +23,8 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.file.FileStore;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -39,16 +41,19 @@ import org.junit.Test;
 public class FileStoreMonitorTest extends ActiveMQTestBase {
 
    private ScheduledExecutorService scheduledExecutorService;
+   private ExecutorService executorService;
 
    @Before
    public void startScheduled() {
       scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
+      executorService = Executors.newSingleThreadExecutor();
    }
 
    @After
    public void stopScheduled() {
       scheduledExecutorService.shutdown();
       scheduledExecutorService = null;
+      executorService.shutdown();
    }
 
    @Test
@@ -91,7 +96,7 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
       };
 
       final AtomicBoolean fakeReturn = new AtomicBoolean(false);
-      FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, 100, TimeUnit.MILLISECONDS, 0.999) {
+      FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, 0.999) {
          @Override
          protected double calculateUsage(FileStore store) throws IOException {
             if (fakeReturn.get()) {
@@ -123,7 +128,7 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
    @Test
    public void testScheduler() throws Exception {
 
-      FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, 20, TimeUnit.MILLISECONDS, 0.9);
+      FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 20, TimeUnit.MILLISECONDS, 0.9);
 
       final ReusableLatch latch = new ReusableLatch(5);
       storeMonitor.addStore(getTestDirfile());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index b5fe7b0..9d727a7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -37,11 +37,11 @@ import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.lang.management.ManagementFactory;
-import java.lang.ref.Reference;
 import java.lang.ref.WeakReference;
 import java.net.ServerSocket;
 import java.sql.Connection;
 import java.sql.Driver;
+import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -139,6 +139,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.FileUtil;
 import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
 import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.jboss.logging.Logger;
 import org.junit.After;
@@ -218,6 +219,16 @@ public abstract class ActiveMQTestBase extends Assert {
       }
    };
 
+   @After
+   public void shutdownDerby() {
+      try {
+         DriverManager.getConnection("jdbc:derby:;shutdown=true");
+      }
+      catch (Exception ignored) {
+      }
+   }
+
+
    static {
       Random random = new Random();
       DEFAULT_UDP_PORT = 6000 + random.nextInt(1000);
@@ -550,60 +561,10 @@ public abstract class ActiveMQTestBase extends Assert {
       }
    }
 
-   private static int failedGCCalls = 0;
-
    public static void forceGC() {
-
-      if (failedGCCalls >= 10) {
-         log.info("ignoring forceGC call since it seems System.gc is not working anyways");
-         return;
-      }
-      log.info("#test forceGC");
-      CountDownLatch finalized = new CountDownLatch(1);
-      WeakReference<DumbReference> dumbReference = new WeakReference<>(new DumbReference(finalized));
-
-      long timeout = System.currentTimeMillis() + 1000;
-
-      // A loop that will wait GC, using the minimal time as possible
-      while (!(dumbReference.get() == null && finalized.getCount() == 0) && System.currentTimeMillis() < timeout) {
-         System.gc();
-         System.runFinalization();
-         try {
-            finalized.await(100, TimeUnit.MILLISECONDS);
-         }
-         catch (InterruptedException e) {
-         }
-      }
-
-      if (dumbReference.get() != null) {
-         failedGCCalls++;
-         log.info("It seems that GC is disabled at your VM");
-      }
-      else {
-         // a success would reset the count
-         failedGCCalls = 0;
-      }
-      log.info("#test forceGC Done ");
+      ThreadLeakCheckRule.forceGC();
    }
 
-   public static void forceGC(final Reference<?> ref, final long timeout) {
-      long waitUntil = System.currentTimeMillis() + timeout;
-      // A loop that will wait GC, using the minimal time as possible
-      while (ref.get() != null && System.currentTimeMillis() < waitUntil) {
-         ArrayList<String> list = new ArrayList<>();
-         for (int i = 0; i < 1000; i++) {
-            list.add("Some string with garbage with concatenation " + i);
-         }
-         list.clear();
-         list = null;
-         System.gc();
-         try {
-            Thread.sleep(500);
-         }
-         catch (InterruptedException e) {
-         }
-      }
-   }
 
    /**
     * Verifies whether weak references are released after a few GCs.
@@ -2514,19 +2475,4 @@ public abstract class ActiveMQTestBase extends Assert {
    public static void waitForLatch(CountDownLatch latch) throws InterruptedException {
       assertTrue("Latch has got to return within a minute", latch.await(1, TimeUnit.MINUTES));
    }
-
-   protected static class DumbReference {
-
-      private CountDownLatch finalized;
-
-      public DumbReference(CountDownLatch finalized) {
-         this.finalized = finalized;
-      }
-
-      @Override
-      public void finalize() throws Throwable {
-         finalized.countDown();
-         super.finalize();
-      }
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
deleted file mode 100644
index f7236e5..0000000
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.tests.util;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.junit.Assert;
-import org.junit.rules.ExternalResource;
-
-/**
- * This is useful to make sure you won't have leaking threads between tests
- */
-public class ThreadLeakCheckRule extends ExternalResource {
-   private static Set<String> knownThreads = new HashSet<>();
-
-   boolean enabled = true;
-
-   private Map<Thread, StackTraceElement[]> previousThreads;
-
-   public void disable() {
-      enabled = false;
-   }
-
-   /**
-    * Override to set up your specific external resource.
-    *
-    * @throws if setup fails (which will disable {@code after}
-    */
-   @Override
-   protected void before() throws Throwable {
-      // do nothing
-
-      previousThreads = Thread.getAllStackTraces();
-
-   }
-
-   /**
-    * Override to tear down your specific external resource.
-    */
-   @Override
-   protected void after() {
-      try {
-         if (enabled) {
-            boolean failed = true;
-
-            boolean failedOnce = false;
-
-            long timeout = System.currentTimeMillis() + 60000;
-            while (failed && timeout > System.currentTimeMillis()) {
-               failed = checkThread();
-
-               if (failed) {
-                  failedOnce = true;
-                  ActiveMQTestBase.forceGC();
-                  try {
-                     Thread.sleep(500);
-                  }
-                  catch (Throwable e) {
-                  }
-               }
-            }
-
-            if (failed) {
-               Assert.fail("Thread leaked");
-            }
-            else if (failedOnce) {
-               System.out.println("******************** Threads cleared after retries ********************");
-               System.out.println();
-            }
-
-         }
-         else {
-            enabled = true;
-         }
-      }
-      finally {
-         // clearing just to help GC
-         previousThreads = null;
-      }
-
-   }
-
-   public static void removeKownThread(String name) {
-      knownThreads.remove(name);
-   }
-
-   public static void addKownThread(String name) {
-      knownThreads.add(name);
-   }
-
-   private boolean checkThread() {
-      boolean failedThread = false;
-
-      Map<Thread, StackTraceElement[]> postThreads = Thread.getAllStackTraces();
-
-      if (postThreads != null && previousThreads != null && postThreads.size() > previousThreads.size()) {
-
-
-         for (Thread aliveThread : postThreads.keySet()) {
-            if (aliveThread.isAlive() && !isExpectedThread(aliveThread) && !previousThreads.containsKey(aliveThread)) {
-               if (!failedThread) {
-                  System.out.println("*********************************************************************************");
-                  System.out.println("LEAKING THREADS");
-               }
-               failedThread = true;
-               System.out.println("=============================================================================");
-               System.out.println("Thread " + aliveThread + " is still alive with the following stackTrace:");
-               StackTraceElement[] elements = postThreads.get(aliveThread);
-               for (StackTraceElement el : elements) {
-                  System.out.println(el);
-               }
-            }
-
-         }
-         if (failedThread) {
-            System.out.println("*********************************************************************************");
-         }
-      }
-
-
-      return failedThread;
-   }
-
-
-   /**
-    * if it's an expected thread... we will just move along ignoring it
-    *
-    * @param thread
-    * @return
-    */
-   private boolean isExpectedThread(Thread thread) {
-      final String threadName = thread.getName();
-      final ThreadGroup group = thread.getThreadGroup();
-      final boolean isSystemThread = group != null && "system".equals(group.getName());
-      final String javaVendor = System.getProperty("java.vendor");
-
-      if (threadName.contains("SunPKCS11")) {
-         return true;
-      }
-      else if (threadName.contains("Attach Listener")) {
-         return true;
-      }
-      else if ((javaVendor.contains("IBM") || isSystemThread) && threadName.equals("process reaper")) {
-         return true;
-      }
-      else if ((javaVendor.contains("IBM") || isSystemThread) && threadName.equals("ClassCache Reaper")) {
-         return true;
-      }
-      else if (javaVendor.contains("IBM") && threadName.equals("MemoryPoolMXBean notification dispatcher")) {
-         return true;
-      }
-      else if (threadName.contains("globalEventExecutor")) {
-         return true;
-      }
-      else if (threadName.contains("threadDeathWatcher")) {
-         return true;
-      }
-      else if (threadName.contains("netty-threads")) {
-         // This is ok as we use EventLoopGroup.shutdownGracefully() which will shutdown things with a bit of delay
-         // if the EventLoop's are still busy.
-         return true;
-      }
-      else if (threadName.contains("threadDeathWatcher")) {
-         //another netty thread
-         return true;
-      }
-      else if (threadName.contains("derby")) {
-         // The derby engine is initialized once, and lasts the lifetime of the VM
-         return true;
-      }
-      else if (threadName.contains("Abandoned connection cleanup thread")) {
-         // MySQL Engine checks for abandoned connections
-         return true;
-      }
-      else if (threadName.contains("Timer")) {
-         // The timer threads in Derby and JDBC use daemon and shutdown once user threads exit.
-         return true;
-      }
-      else if (threadName.contains("hawtdispatch")) {
-         // Static workers used by MQTT client.
-         return true;
-      }
-      else {
-         for (StackTraceElement element : thread.getStackTrace()) {
-            if (element.getClassName().contains("org.jboss.byteman.agent.TransformListener")) {
-               return true;
-            }
-         }
-
-         for (String known: knownThreads) {
-            if (threadName.contains(known)) {
-               return true;
-            }
-         }
-
-         return false;
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/activemq5-unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/pom.xml b/tests/activemq5-unit-tests/pom.xml
index 2c7d411..6cafa8d 100644
--- a/tests/activemq5-unit-tests/pom.xml
+++ b/tests/activemq5-unit-tests/pom.xml
@@ -55,6 +55,13 @@
          <version>${project.version}</version>
          <type>test-jar</type>
       </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>artemis-commons</artifactId>
+         <version>${project.version}</version>
+         <scope>test</scope>
+         <type>test-jar</type>
+      </dependency>
 
       <dependency>
          <groupId>org.apache.activemq</groupId>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/extra-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/extra-tests/pom.xml b/tests/extra-tests/pom.xml
index 415dfc6..497b33a 100644
--- a/tests/extra-tests/pom.xml
+++ b/tests/extra-tests/pom.xml
@@ -106,6 +106,13 @@
          <type>test-jar</type>
       </dependency>
       <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>artemis-commons</artifactId>
+         <version>${project.version}</version>
+         <scope>test</scope>
+         <type>test-jar</type>
+      </dependency>
+      <dependency>
          <groupId>org.apache.activemq.tests</groupId>
          <artifactId>unit-tests</artifactId>
          <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index 57b9171..c6f9834 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -54,6 +54,13 @@
          <type>test-jar</type>
       </dependency>
       <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>artemis-commons</artifactId>
+         <version>${project.version}</version>
+         <scope>test</scope>
+         <type>test-jar</type>
+      </dependency>
+      <dependency>
          <groupId>org.apache.activemq.tests</groupId>
          <artifactId>unit-tests</artifactId>
          <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
index 5bf36e9..234a9fb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
@@ -20,7 +20,7 @@ import org.apache.activemq.artemis.api.core.BroadcastEndpoint;
 import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
 import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
 import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
-import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
+import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
 import org.jgroups.JChannel;
 import org.jgroups.conf.PlainConfigurator;
 import org.junit.After;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
index 405acde..fc3d9ff 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
@@ -16,9 +16,14 @@
  */
 package org.apache.activemq.artemis.tests.integration.jdbc.store.journal;
 
+import java.sql.DriverManager;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.core.journal.IOCompletion;
@@ -26,7 +31,7 @@ import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
+import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -45,10 +50,32 @@ public class JDBCJournalTest extends ActiveMQTestBase {
 
    private String jdbcUrl;
 
+   private ScheduledExecutorService scheduledExecutorService;
+
+   private ExecutorService executorService;
+
+   @After
+   @Override
+   public void tearDown() throws Exception {
+      journal.destroy();
+      try {
+         DriverManager.getConnection("jdbc:derby:;shutdown=true");
+      }
+      catch (Exception ignored) {
+      }
+      scheduledExecutorService.shutdown();
+      scheduledExecutorService = null;
+      executorService.shutdown();
+      executorService = null;
+
+   }
+
    @Before
    public void setup() throws Exception {
+      scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
+      executorService = Executors.newSingleThreadExecutor();
       jdbcUrl = "jdbc:derby:target/data;create=true";
-      journal = new JDBCJournalImpl(jdbcUrl, JOURNAL_TABLE_NAME, DRIVER_CLASS);
+      journal = new JDBCJournalImpl(jdbcUrl, JOURNAL_TABLE_NAME, DRIVER_CLASS, scheduledExecutorService, executorService);
       journal.start();
    }
 
@@ -59,7 +86,6 @@ public class JDBCJournalTest extends ActiveMQTestBase {
          journal.appendAddRecord(i, (byte) 1, new byte[0], true);
       }
 
-      Thread.sleep(3000);
       assertEquals(noRecords, journal.getNumberOfRecords());
    }
 
@@ -122,9 +148,4 @@ public class JDBCJournalTest extends ActiveMQTestBase {
       assertEquals(noRecords + (noTxRecords * noTx), recordInfos.size());
    }
 
-   @After
-   @Override
-   public void tearDown() throws Exception {
-      journal.destroy();
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
index f1b602f..096e45d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
@@ -1634,7 +1634,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
 
       final ExecutorService deleteExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
 
-      final JournalStorageManager storage = new JournalStorageManager(config, factory, null);
+      final JournalStorageManager storage = new JournalStorageManager(config, factory);
 
       storage.start();
       storage.loadInternalOnly();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
index de89e18..5e24d55 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
@@ -91,7 +91,7 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase {
 
    @Override
    protected JournalStorageManager createJournalStorageManager(Configuration configuration) {
-      return new JournalStorageManager(configuration, execFactory, null) {
+      return new JournalStorageManager(configuration, execFactory) {
          @Override
          public void deleteMessage(final long messageID) throws Exception {
             deletedMessage.add(messageID);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
index 15e96b2..1c51b36 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
@@ -65,7 +65,7 @@ public class RestartSMTest extends ActiveMQTestBase {
 
       PostOffice postOffice = new FakePostOffice();
 
-      final JournalStorageManager journal = new JournalStorageManager(createDefaultInVMConfig(), execFactory, null);
+      final JournalStorageManager journal = new JournalStorageManager(createDefaultInVMConfig(), execFactory);
 
       try {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
index 886cde3..cdf6743 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
@@ -20,6 +20,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.StoreConfiguration;
@@ -47,6 +49,8 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
 
    protected ExecutorFactory execFactory;
 
+   protected ScheduledExecutorService scheduledExecutorService;
+
    protected StorageManager journal;
 
    protected JMSStorageManager jmsJournal;
@@ -73,6 +77,8 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
       super.setUp();
 
       execFactory = getOrderedExecutor();
+
+      scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
    }
 
    @Override
@@ -103,6 +109,8 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
          jmsJournal = null;
       }
 
+      scheduledExecutorService.shutdown();
+
       destroyTables(Arrays.asList(new String[] {"MESSAGE", "BINDINGS", "LARGE_MESSAGE"}));
       super.tearDown();
       if (exception != null)
@@ -132,7 +140,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
     * @param configuration
     */
    protected JournalStorageManager createJournalStorageManager(Configuration configuration) {
-      JournalStorageManager jsm = new JournalStorageManager(configuration, execFactory, null);
+      JournalStorageManager jsm = new JournalStorageManager(configuration, execFactory);
       addActiveMQComponent(jsm);
       return jsm;
    }
@@ -141,7 +149,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
     * @param configuration
     */
    protected JDBCJournalStorageManager createJDBCJournalStorageManager(Configuration configuration) {
-      JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory, null);
+      JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory, scheduledExecutorService);
       addActiveMQComponent(jsm);
       return jsm;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 6ff0cf0..e05bdb2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -440,7 +440,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
     * @throws Exception
     */
    private JournalStorageManager getStorage() throws Exception {
-      return new JournalStorageManager(createDefaultInVMConfig(), factory, null);
+      return new JournalStorageManager(createDefaultInVMConfig(), factory);
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f8278ec9/tests/unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/unit-tests/pom.xml b/tests/unit-tests/pom.xml
index ef2d902..198231e 100644
--- a/tests/unit-tests/pom.xml
+++ b/tests/unit-tests/pom.xml
@@ -40,6 +40,13 @@
       </dependency>
       <dependency>
          <groupId>org.apache.activemq</groupId>
+         <artifactId>artemis-commons</artifactId>
+         <version>${project.version}</version>
+         <scope>test</scope>
+         <type>test-jar</type>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
          <artifactId>artemis-server</artifactId>
          <version>${project.version}</version>
          <scope>test</scope>


Mime
View raw message