activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject activemq-artemis git commit: ARTEMIS-1732 AMQP anonymous producer not blocked on max-disk-usage
Date Wed, 01 Aug 2018 01:40:41 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x ae1edf6c6 -> 73b3ebff1


ARTEMIS-1732 AMQP anonymous producer not blocked on max-disk-usage

Anonymous senders (those created without a target address) are not
blocked when max-disk-usage is reached. The cause is that when such
a sender is created on the broker, the broker doesn't check the
disk/memory usage and gives out the credit immediately.

Squashed with:
-----

ARTEMIS-1732 I simplified some of the changes performed at the previous commit.

Also I changed GlobalDiskFullTest to actually block the senders.
I moved the Runnables from PagingManager into the Util as AtomicRunnable.

(cherry picked from commit 0e36e072bdf0c4636623aacbd15912857770c73f)
(cherry picked from commit 53e1d601601204dc2aa587fcb3046d5c1d6d026d)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/73b3ebff
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/73b3ebff
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/73b3ebff

Branch: refs/heads/2.6.x
Commit: 73b3ebff1a8c2aef84d2df90eb671258201b9fa4
Parents: ae1edf6
Author: Howard Gao <howard.gao@gmail.com>
Authored: Mon Mar 12 10:33:09 2018 +0800
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Jul 31 21:40:17 2018 -0400

----------------------------------------------------------------------
 .../artemis/utils/runnables/AtomicRunnable.java |  47 +++++++
 .../runnables/AtomicRunnableWithDelegate.java   |  32 +++++
 .../amqp/broker/AMQPSessionCallback.java        |  27 ++--
 .../artemis/core/paging/PagingManager.java      |   6 +
 .../core/paging/impl/PagingManagerImpl.java     |  39 +++++-
 .../core/paging/impl/PagingStoreImpl.java       |  43 ++----
 .../integration/amqp/GlobalDiskFullTest.java    | 134 +++++++++++++++++++
 .../tests/unit/util/FakePagingManager.java      |   5 +
 8 files changed, 281 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73b3ebff/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
new file mode 100644
index 0000000..f1f53ce
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.runnables;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+public abstract class AtomicRunnable implements Runnable {
+
+
+   public static Runnable checkAtomic(Runnable run) {
+      if (run instanceof AtomicRunnable) {
+         return run;
+      } else {
+         return new AtomicRunnableWithDelegate(run);
+      }
+   }
+
+   private volatile int ran;
+
+   private static final AtomicIntegerFieldUpdater<AtomicRunnable> RAN_UPDATE =
+      AtomicIntegerFieldUpdater.newUpdater(AtomicRunnable.class, "ran");
+
+   @Override
+   public void run() {
+      if (RAN_UPDATE.compareAndSet(this, 0, 1)) {
+         atomicRun();
+      }
+   }
+
+   public abstract void atomicRun();
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73b3ebff/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java
new file mode 100644
index 0000000..d1583de
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.runnables;
+
+public class AtomicRunnableWithDelegate extends AtomicRunnable {
+
+   private final Runnable runnable;
+
+   public AtomicRunnableWithDelegate(Runnable runnable) {
+      this.runnable = runnable;
+   }
+
+   @Override
+   public void atomicRun() {
+      runnable.run();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73b3ebff/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 6461bb2..86c0687 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
+import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -576,7 +577,8 @@ public class AMQPSessionCallback implements SessionCallback {
                                    final int threshold,
                                    final Receiver receiver) {
       try {
-         if (address == null) {
+         PagingManager pagingManager = manager.getServer().getPagingManager();
+         Runnable creditRunnable = () -> {
             connection.lock();
             try {
                receiver.flow(credits);
@@ -584,23 +586,14 @@ public class AMQPSessionCallback implements SessionCallback {
                connection.unlock();
             }
             connection.flush();
-            return;
+         };
+
+         if (address == null) {
+            pagingManager.checkMemory(creditRunnable);
+         } else {
+            final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
+            store.checkMemory(creditRunnable);
          }
-         final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
-         store.checkMemory(new Runnable() {
-            @Override
-            public void run() {
-               connection.lock();
-               try {
-                  if (receiver.getRemoteCredit() <= threshold) {
-                     receiver.flow(credits);
-                  }
-               } finally {
-                  connection.unlock();
-               }
-               connection.flush();
-            }
-         });
       } catch (Exception e) {
          throw new RuntimeException(e);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73b3ebff/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
index 4d472e1..5d8461e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
@@ -111,4 +111,10 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
       return 0;
    }
 
+   /**
+    * Use this when you have no refernce of an address. (anonymous AMQP Producers for example)
+    * @param runWhenAvailable
+    */
+   void checkMemory(Runnable runWhenAvailable);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73b3ebff/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index bca70cf..8893984 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -20,9 +20,12 @@ import java.nio.file.FileStore;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -38,6 +41,7 @@ import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
 import org.jboss.logging.Logger;
 
 public final class PagingManagerImpl implements PagingManager {
@@ -74,6 +78,10 @@ public final class PagingManagerImpl implements PagingManager {
 
    private volatile boolean diskFull = false;
 
+   private final Executor memoryExecutor;
+
+   private final Queue<Runnable> memoryCallback = new ConcurrentLinkedQueue<>();
+
    private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions
= new ConcurrentHashMap<>();
 
    private ActiveMQScheduledComponent scheduledComponent = null;
@@ -102,6 +110,7 @@ public final class PagingManagerImpl implements PagingManager {
       this.addressSettingsRepository = addressSettingsRepository;
       addressSettingsRepository.registerListener(this);
       this.maxSize = maxSize;
+      this.memoryExecutor = pagingSPI.newExecutor();
    }
 
    public PagingManagerImpl(final PagingStoreFactory pagingSPI,
@@ -154,6 +163,13 @@ public final class PagingManagerImpl implements PagingManager {
 
    protected void checkMemoryRelease() {
       if (!diskFull && (maxSize < 0 || globalSizeBytes.get() < maxSize) &&
!blockedStored.isEmpty()) {
+         if (!memoryCallback.isEmpty()) {
+            if (memoryExecutor != null) {
+               memoryExecutor.execute(this::memoryReleased);
+            } else {
+               memoryReleased();
+            }
+         }
          Iterator<PagingStore> storeIterator = blockedStored.iterator();
          while (storeIterator.hasNext()) {
             PagingStore store = storeIterator.next();
@@ -187,7 +203,7 @@ public final class PagingManagerImpl implements PagingManager {
 
       @Override
       public void under(FileStore store, double usage) {
-         if (diskFull) {
+         if (diskFull || !blockedStored.isEmpty() || !memoryCallback.isEmpty()) {
             ActiveMQServerLogger.LOGGER.diskCapacityRestored();
             diskFull = false;
             checkMemoryRelease();
@@ -206,6 +222,27 @@ public final class PagingManagerImpl implements PagingManager {
    }
 
    @Override
+   public void checkMemory(final Runnable runWhenAvailable) {
+
+      if (isGlobalFull()) {
+         memoryCallback.add(AtomicRunnable.checkAtomic(runWhenAvailable));
+         return;
+      }
+      runWhenAvailable.run();
+   }
+
+
+   private void memoryReleased() {
+      Runnable runnable;
+
+      while ((runnable = memoryCallback.poll()) != null) {
+         runnable.run();
+      }
+   }
+
+
+
+   @Override
    public boolean isGlobalFull() {
       return diskFull || maxSize > 0 && globalSizeBytes.get() > maxSize;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73b3ebff/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 6a07ffc..06b42a4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -62,6 +62,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperation;
 import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import org.apache.activemq.artemis.utils.FutureLatch;
 import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
 import org.jboss.logging.Logger;
 
 /**
@@ -641,40 +642,16 @@ public class PagingStoreImpl implements PagingStore {
 
    }
 
-   private final Queue<OverSizedRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>();
+   private final Queue<Runnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>();
 
-   private class MemoryFreedRunnablesExecutor implements Runnable {
+   private void memoryReleased() {
+      Runnable runnable;
 
-      @Override
-      public void run() {
-         Runnable runnable;
-
-         while ((runnable = onMemoryFreedRunnables.poll()) != null) {
-            runnable.run();
-         }
+      while ((runnable = onMemoryFreedRunnables.poll()) != null) {
+         runnable.run();
       }
    }
 
-   private final Runnable memoryFreedRunnablesExecutor = new MemoryFreedRunnablesExecutor();
-
-   // To be used when the memory is oversized either by local settings or global settings
on blocking addresses
-   private static final class OverSizedRunnable implements Runnable {
-
-      private final AtomicBoolean ran = new AtomicBoolean(false);
-
-      private final Runnable runnable;
-
-      private OverSizedRunnable(final Runnable runnable) {
-         this.runnable = runnable;
-      }
-
-      @Override
-      public void run() {
-         if (ran.compareAndSet(false, true)) {
-            runnable.run();
-         }
-      }
-   }
 
    @Override
    public boolean checkMemory(final Runnable runWhenAvailable) {
@@ -685,9 +662,8 @@ public class PagingStoreImpl implements PagingStore {
          }
       } else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK
&& (maxSize != -1 || usingGlobalMaxSize)) {
          if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() >
maxSize || pagingManager.isGlobalFull()) {
-            OverSizedRunnable ourRunnable = new OverSizedRunnable(runWhenAvailable);
 
-            onMemoryFreedRunnables.add(ourRunnable);
+            onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
 
             // We check again to avoid a race condition where the size can come down just
after the element
             // has been added, but the check to execute was done before the element was added
@@ -695,7 +671,7 @@ public class PagingStoreImpl implements PagingStore {
             // MUCH better performance in a highly concurrent environment
             if (!pagingManager.isGlobalFull() && (sizeInBytes.get() <= maxSize
|| maxSize < 0)) {
                // run it now
-               ourRunnable.run();
+               runWhenAvailable.run();
             } else {
                if (usingGlobalMaxSize || pagingManager.isDiskFull()) {
                   pagingManager.addBlockedStore(this);
@@ -710,7 +686,6 @@ public class PagingStoreImpl implements PagingStore {
                   blocking.set(true);
                }
             }
-
             return true;
          }
       }
@@ -756,7 +731,7 @@ public class PagingStoreImpl implements PagingStore {
    public boolean checkReleaseMemory(boolean globalOversized, long newSize) {
       if (!globalOversized && (newSize <= maxSize || maxSize < 0)) {
          if (!onMemoryFreedRunnables.isEmpty()) {
-            executor.execute(memoryFreedRunnablesExecutor);
+            executor.execute(this::memoryReleased);
             if (blocking.get()) {
                ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, sizeInBytes.get(),
maxSize);
                blocking.set(false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73b3ebff/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
new file mode 100644
index 0000000..0e0f86d
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.URI;
+import java.nio.file.FileStore;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class GlobalDiskFullTest extends AmqpClientTestSupport {
+
+   @Override
+   protected void addConfiguration(ActiveMQServer server) {
+      Configuration serverConfig = server.getConfiguration();
+      serverConfig.setDiskScanPeriod(100);
+   }
+
+   @Test
+   public void testProducerOnDiskFull() throws Exception {
+      FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0);
+      final CountDownLatch latch = new CountDownLatch(1);
+      monitor.addCallback(new FileStoreMonitor.Callback() {
+
+         @Override
+         public void tick(FileStore store, double usage) {
+         }
+
+         @Override
+         public void over(FileStore store, double usage) {
+            latch.countDown();
+         }
+         @Override
+         public void under(FileStore store, double usage) {
+         }
+      });
+
+      Assert.assertTrue(latch.await(1, TimeUnit.MINUTES));
+
+      AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + AMQP_PORT));
+      AmqpConnection connection = addConnection(client.connect());
+
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getQueueName());
+         byte[] payload = new byte[1000];
+
+
+         AmqpSender anonSender = session.createSender();
+
+         CountDownLatch sentWithName = new CountDownLatch(1);
+         CountDownLatch sentAnon = new CountDownLatch(1);
+
+         Thread threadWithName = new Thread() {
+            @Override
+            public void run() {
+
+               try {
+                  final AmqpMessage message = new AmqpMessage();
+                  message.setBytes(payload);
+                  sender.setSendTimeout(-1);
+                  sender.send(message);
+               } catch (Exception e) {
+                  e.printStackTrace();
+               } finally {
+                  sentWithName.countDown();
+               }
+            }
+         };
+
+         threadWithName.start();
+
+
+         Thread threadWithAnon = new Thread() {
+            @Override
+            public void run() {
+               try {
+                  final AmqpMessage message = new AmqpMessage();
+                  message.setBytes(payload);
+                  anonSender.setSendTimeout(-1);
+                  message.setAddress(getQueueName());
+                  anonSender.send(message);
+               } catch (Exception e) {
+                  e.printStackTrace();
+               } finally {
+                  sentAnon.countDown();
+               }
+            }
+         };
+
+         threadWithAnon.start();
+
+         Assert.assertFalse("Thread sender should be blocked", sentWithName.await(500, TimeUnit.MILLISECONDS));
+         Assert.assertFalse("Thread sender anonymous should be blocked", sentAnon.await(500,
TimeUnit.MILLISECONDS));
+
+         monitor.setMaxUsage(100.0);
+
+         Assert.assertTrue("Thread sender should be released", sentWithName.await(30, TimeUnit.SECONDS));
+         Assert.assertTrue("Thread sender anonymous should be released", sentAnon.await(30,
TimeUnit.SECONDS));
+
+         threadWithName.join(TimeUnit.SECONDS.toMillis(30));
+         threadWithAnon.join(TimeUnit.SECONDS.toMillis(30));
+         Assert.assertFalse(threadWithName.isAlive());
+         Assert.assertFalse(threadWithAnon.isAlive());
+      } finally {
+         connection.close();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73b3ebff/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
index d1012a6..94a9d79 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
@@ -35,6 +35,11 @@ public final class FakePagingManager implements PagingManager {
    }
 
    @Override
+   public void checkMemory(Runnable runWhenAvailable) {
+
+   }
+
+   @Override
    public void addTransaction(final PageTransactionInfo pageTransaction) {
    }
 


Mime
View raw message