activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/3] activemq-artemis git commit: ARTEMIS-1732 AMQP anonymous producer not blocked on max-disk-usage
Date Wed, 01 Aug 2018 01:39:28 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 168a7cfea -> a7e4ce490


ARTEMIS-1732 AMQP anonymous producer not blocked on max-disk-usage

Anonymous senders (those created without a target address) are not
blocked when max-disk-usage is reached. The cause is that when such
a sender is created on the broker, the broker doesn't check the
disk/memory usage and gives out the credit immediately.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/53e1d601
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/53e1d601
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/53e1d601

Branch: refs/heads/master
Commit: 53e1d601601204dc2aa587fcb3046d5c1d6d026d
Parents: 168a7cf
Author: Howard Gao <howard.gao@gmail.com>
Authored: Mon Mar 12 10:33:09 2018 +0800
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Jul 31 21:39:01 2018 -0400

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        | 27 +++---
 .../artemis/core/paging/PagingManager.java      | 55 +++++++++++-
 .../artemis/core/paging/PagingStore.java        |  7 +-
 .../core/paging/impl/PagingManagerImpl.java     | 44 +++++++++-
 .../core/paging/impl/PagingStoreImpl.java       | 44 +---------
 .../core/server/ActiveMQServerLogger.java       | 13 +++
 .../core/server/files/FileStoreMonitor.java     |  9 +-
 .../core/server/files/FileStoreMonitorTest.java | 10 ---
 .../integration/amqp/GlobalDiskFullTest.java    | 89 ++++++++++++++++++++
 .../tests/unit/util/FakePagingManager.java      |  7 +-
 10 files changed, 224 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53e1d601/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 6461bb2..1f5ccbc 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
+import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -576,31 +577,25 @@ public class AMQPSessionCallback implements SessionCallback {
                                    final int threshold,
                                    final Receiver receiver) {
       try {
-         if (address == null) {
+         PagingManager pagingManager = manager.getServer().getPagingManager();
+         Runnable creditRunnable = () -> {
             connection.lock();
             try {
-               receiver.flow(credits);
+               if (receiver.getRemoteCredit() <= threshold) {
+                  receiver.flow(credits);
+               }
             } finally {
                connection.unlock();
             }
             connection.flush();
+         };
+
+         if (address == null) {
+            pagingManager.checkMemory(creditRunnable);
             return;
          }
          final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
-         store.checkMemory(new Runnable() {
-            @Override
-            public void run() {
-               connection.lock();
-               try {
-                  if (receiver.getRemoteCredit() <= threshold) {
-                     receiver.flow(credits);
-                  }
-               } finally {
-                  connection.unlock();
-               }
-               connection.flush();
-            }
-         });
+         store.checkMemory(creditRunnable);
       } catch (Exception e) {
          throw new RuntimeException(e);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53e1d601/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
index 4d472e1..c8eb2ec 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
@@ -17,6 +17,9 @@
 package org.apache.activemq.artemis.core.paging;
 
 import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@@ -79,7 +82,7 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
 
    void resumeCleanup();
 
-   void addBlockedStore(PagingStore store);
+   void addBlockedStore(Blockable store);
 
    void injectMonitor(FileStoreMonitor monitor) throws Exception;
 
@@ -111,4 +114,54 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
       return 0;
    }
 
+   boolean checkMemory(Runnable runnable);
+
+   // To be used when the memory is oversized either by local settings or global settings
on blocking addresses
+   final class OverSizedRunnable implements Runnable {
+
+      private final AtomicBoolean ran = new AtomicBoolean(false);
+
+      private final Runnable runnable;
+
+      public OverSizedRunnable(final Runnable runnable) {
+         this.runnable = runnable;
+      }
+
+      @Override
+      public void run() {
+         if (ran.compareAndSet(false, true)) {
+            runnable.run();
+         }
+      }
+   }
+
+   interface Blockable {
+      /**
+       * It will return true if the destination is leaving blocking.
+       */
+      boolean checkReleasedMemory();
+   }
+
+   final class MemoryFreedRunnablesExecutor implements Runnable {
+
+      private final Queue<OverSizedRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>();
+
+      public void addRunnable(PagingManager.OverSizedRunnable runnable) {
+         onMemoryFreedRunnables.add(runnable);
+      }
+
+      @Override
+      public void run() {
+         Runnable runnable;
+
+         while ((runnable = onMemoryFreedRunnables.poll()) != null) {
+            runnable.run();
+         }
+      }
+
+      public boolean isEmpty() {
+         return onMemoryFreedRunnables.isEmpty();
+      }
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53e1d601/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 4dd8bf8..27e8c0f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -42,7 +42,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
  *
  * @see PagingManager
  */
-public interface PagingStore extends ActiveMQComponent, RefCountMessageListener {
+public interface PagingStore extends ActiveMQComponent, RefCountMessageListener, PagingManager.Blockable
{
 
    SimpleString getAddress();
 
@@ -132,11 +132,6 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
    boolean isRejectingMessages();
 
    /**
-    * It will return true if the destination is leaving blocking.
-    */
-   boolean checkReleasedMemory();
-
-   /**
     * Write lock the PagingStore.
     *
     * @param timeout milliseconds to wait for the lock. If value is {@literal -1} then wait

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53e1d601/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index bca70cf..878f918 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -56,7 +57,7 @@ public final class PagingManagerImpl implements PagingManager {
     */
    private final ReentrantReadWriteLock syncLock = new ReentrantReadWriteLock();
 
-   private final Set<PagingStore> blockedStored = new ConcurrentHashSet<>();
+   private final Set<Blockable> blockedStored = new ConcurrentHashSet<>();
 
    private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<>();
 
@@ -78,6 +79,9 @@ public final class PagingManagerImpl implements PagingManager {
 
    private ActiveMQScheduledComponent scheduledComponent = null;
 
+   private final PagingManager.MemoryFreedRunnablesExecutor memoryFreedRunnablesExecutor
= new PagingManager.MemoryFreedRunnablesExecutor();
+
+   private final Executor executor;
    // Static
    // --------------------------------------------------------------------------------------------------------------------------
 
@@ -102,6 +106,7 @@ public final class PagingManagerImpl implements PagingManager {
       this.addressSettingsRepository = addressSettingsRepository;
       addressSettingsRepository.registerListener(this);
       this.maxSize = maxSize;
+      executor = pagingStoreFactory.newExecutor();
    }
 
    public PagingManagerImpl(final PagingStoreFactory pagingSPI,
@@ -110,7 +115,7 @@ public final class PagingManagerImpl implements PagingManager {
    }
 
    @Override
-   public void addBlockedStore(PagingStore store) {
+   public void addBlockedStore(Blockable store) {
       blockedStored.add(store);
    }
 
@@ -152,11 +157,42 @@ public final class PagingManagerImpl implements PagingManager {
       return globalSizeBytes.get();
    }
 
+   @Override
+   public boolean checkMemory(final Runnable runWhenAvailable) {
+      if (isGlobalFull()) {
+         OverSizedRunnable ourRunnable = new OverSizedRunnable(runWhenAvailable);
+
+         memoryFreedRunnablesExecutor.addRunnable(ourRunnable);
+         addBlockedStore(() -> {
+            if (!isGlobalFull()) {
+               if (!memoryFreedRunnablesExecutor.isEmpty()) {
+                  executor.execute(memoryFreedRunnablesExecutor);
+                  ActiveMQServerLogger.LOGGER.unblockingGlobalMessageProduction(getGlobalSize());
+                  return true;
+               }
+            }
+            return false;
+         });
+
+         if (isDiskFull()) {
+            ActiveMQServerLogger.LOGGER.blockingGlobalDiskFull();
+         } else {
+            ActiveMQServerLogger.LOGGER.blockingGlobalMessageProduction(getGlobalSize());
+         }
+
+         return true;
+      }
+
+      runWhenAvailable.run();
+
+      return true;
+   }
+
    protected void checkMemoryRelease() {
       if (!diskFull && (maxSize < 0 || globalSizeBytes.get() < maxSize) &&
!blockedStored.isEmpty()) {
-         Iterator<PagingStore> storeIterator = blockedStored.iterator();
+         Iterator<Blockable> storeIterator = blockedStored.iterator();
          while (storeIterator.hasNext()) {
-            PagingStore store = storeIterator.next();
+            Blockable store = storeIterator.next();
             if (store.checkReleasedMemory()) {
                storeIterator.remove();
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53e1d601/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 39687b0..74212ce 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -23,9 +23,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -641,40 +639,7 @@ public class PagingStoreImpl implements PagingStore {
 
    }
 
-   private final Queue<OverSizedRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>();
-
-   private class MemoryFreedRunnablesExecutor implements Runnable {
-
-      @Override
-      public void run() {
-         Runnable runnable;
-
-         while ((runnable = onMemoryFreedRunnables.poll()) != null) {
-            runnable.run();
-         }
-      }
-   }
-
-   private final Runnable memoryFreedRunnablesExecutor = new MemoryFreedRunnablesExecutor();
-
-   // To be used when the memory is oversized either by local settings or global settings
on blocking addresses
-   private static final class OverSizedRunnable implements Runnable {
-
-      private final AtomicBoolean ran = new AtomicBoolean(false);
-
-      private final Runnable runnable;
-
-      private OverSizedRunnable(final Runnable runnable) {
-         this.runnable = runnable;
-      }
-
-      @Override
-      public void run() {
-         if (ran.compareAndSet(false, true)) {
-            runnable.run();
-         }
-      }
-   }
+   private final PagingManager.MemoryFreedRunnablesExecutor memoryFreedRunnablesExecutor
= new PagingManager.MemoryFreedRunnablesExecutor();
 
    @Override
    public boolean checkMemory(final Runnable runWhenAvailable) {
@@ -685,9 +650,9 @@ public class PagingStoreImpl implements PagingStore {
          }
       } else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK
&& (maxSize != -1 || usingGlobalMaxSize)) {
          if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() >
maxSize || pagingManager.isGlobalFull()) {
-            OverSizedRunnable ourRunnable = new OverSizedRunnable(runWhenAvailable);
+            PagingManager.OverSizedRunnable ourRunnable = new PagingManager.OverSizedRunnable(runWhenAvailable);
 
-            onMemoryFreedRunnables.add(ourRunnable);
+            memoryFreedRunnablesExecutor.addRunnable(ourRunnable);
 
             // We check again to avoid a race condition where the size can come down just
after the element
             // has been added, but the check to execute was done before the element was added
@@ -710,7 +675,6 @@ public class PagingStoreImpl implements PagingStore {
                   blocking.set(true);
                }
             }
-
             return true;
          }
       }
@@ -755,7 +719,7 @@ public class PagingStoreImpl implements PagingStore {
 
    public boolean checkReleaseMemory(boolean globalOversized, long newSize) {
       if (!globalOversized && (newSize <= maxSize || maxSize < 0)) {
-         if (!onMemoryFreedRunnables.isEmpty()) {
+         if (!memoryFreedRunnablesExecutor.isEmpty()) {
             executor.execute(memoryFreedRunnablesExecutor);
             if (blocking.get()) {
                ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, sizeInBytes.get(),
maxSize);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53e1d601/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index b10d652..96fffe5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1950,4 +1950,17 @@ public interface ActiveMQServerLogger extends BasicLogger {
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224095, value = "Error updating Consumer Count: {0}", format = Message.Format.MESSAGE_FORMAT)
    void consumerCountError(String reason);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 224096, value = "Disk Full! Blocking message production. Clients will report
blocked.", format = Message.Format.MESSAGE_FORMAT)
+   void blockingGlobalDiskFull();
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 224097, value = "Blocking message production; size is currently: {0} bytes;",
format = Message.Format.MESSAGE_FORMAT)
+   void blockingGlobalMessageProduction(long globalSize);
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 224098, value = "Unblocking message production; size is currently: {0} bytes;",
format = Message.Format.MESSAGE_FORMAT)
+   void unblockingGlobalMessageProduction(long globalSize);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53e1d601/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
index ad59117..957661c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
@@ -150,11 +150,14 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
 
    public interface Callback {
 
-      void tick(FileStore store, double usage);
+      default void tick(FileStore store, double usage) {
+      }
 
-      void over(FileStore store, double usage);
+      default void over(FileStore store, double usage) {
+      }
 
-      void under(FileStore store, double usage);
+      default void under(FileStore store, double usage) {
+      }
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53e1d601/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
index b91d3de..e4f27c3 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
@@ -137,16 +137,6 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
             System.out.println("TickS::" + usage);
             latch.countDown();
          }
-
-         @Override
-         public void over(FileStore store, double usage) {
-
-         }
-
-         @Override
-         public void under(FileStore store, double usage) {
-
-         }
       });
       storeMonitor.start();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53e1d601/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
new file mode 100644
index 0000000..d664013
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+import java.net.URI;
+import java.nio.file.FileStore;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class GlobalDiskFullTest extends AmqpClientTestSupport {
+
+   @Override
+   protected void addConfiguration(ActiveMQServer server) {
+      Configuration serverConfig = server.getConfiguration();
+      serverConfig.setDiskScanPeriod(100);
+   }
+
+   @Test
+   public void testProducerOnDiskFull() throws Exception {
+      FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0);
+      final CountDownLatch latch = new CountDownLatch(1);
+      monitor.addCallback(new FileStoreMonitor.Callback() {
+         @Override
+         public void over(FileStore store, double usage) {
+            latch.countDown();
+         }
+         @Override
+         public void under(FileStore store, double usage) {
+         }
+      });
+      latch.await(2, TimeUnit.SECONDS);
+
+      AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + AMQP_PORT));
+      AmqpConnection connection = addConnection(client.connect());
+
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getQueueName());
+         final AmqpMessage message = new AmqpMessage();
+         byte[] payload = new byte[1000];
+         message.setBytes(payload);
+
+         sender.setSendTimeout(1000);
+         sender.send(message);
+
+         org.apache.activemq.artemis.core.server.Queue queueView = getProxyToQueue(getQueueName());
+         assertEquals("shouldn't receive any messages", 0, queueView.getMessageCount());
+
+         AmqpSender anonSender = session.createSender();
+         final AmqpMessage message1 = new AmqpMessage();
+         message1.setBytes(payload);
+         message1.setAddress(getQueueName());
+
+         anonSender.setSendTimeout(1000);
+         anonSender.send(message1);
+
+         queueView = getProxyToQueue(getQueueName());
+         assertEquals("shouldn't receive any messages", 0, queueView.getMessageCount());
+
+      } finally {
+         connection.close();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53e1d601/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
index d1012a6..3431655 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
@@ -30,7 +30,7 @@ import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 public final class FakePagingManager implements PagingManager {
 
    @Override
-   public void addBlockedStore(PagingStore store) {
+   public void addBlockedStore(Blockable store) {
 
    }
 
@@ -115,6 +115,11 @@ public final class FakePagingManager implements PagingManager {
       return false;
    }
 
+   @Override
+   public boolean checkMemory(Runnable runnable) {
+      return false;
+   }
+
    /*
     * (non-Javadoc)
     * @see org.apache.activemq.artemis.core.paging.PagingManager#isGlobalFull()


Mime
View raw message