activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/3] activemq-artemis git commit: ARTEMIS-1732 I simplified some of the changes performed at the previous commit.
Date Wed, 01 Aug 2018 01:39:29 GMT
ARTEMIS-1732 I simplified some of the changes performed at the previous commit.

Also I changed GlobalDiskFullTest to actually block the senders.
I moved the Runnables from PagingManager into the Util as AtomicRunnable.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0e36e072
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0e36e072
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0e36e072

Branch: refs/heads/master
Commit: 0e36e072bdf0c4636623aacbd15912857770c73f
Parents: 53e1d60
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Tue Jul 31 21:08:46 2018 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Jul 31 21:39:04 2018 -0400

----------------------------------------------------------------------
 .../artemis/utils/runnables/AtomicRunnable.java | 47 ++++++++++++
 .../runnables/AtomicRunnableWithDelegate.java   | 32 ++++++++
 .../amqp/broker/AMQPSessionCallback.java        | 10 +--
 .../artemis/core/paging/PagingManager.java      | 59 ++------------
 .../artemis/core/paging/PagingStore.java        |  7 +-
 .../core/paging/impl/PagingManagerImpl.java     | 81 ++++++++++----------
 .../core/paging/impl/PagingStoreImpl.java       | 23 ++++--
 .../core/server/ActiveMQServerLogger.java       | 13 ----
 .../core/server/files/FileStoreMonitor.java     |  9 +--
 .../core/server/files/FileStoreMonitorTest.java | 10 +++
 .../integration/amqp/GlobalDiskFullTest.java    | 75 ++++++++++++++----
 .../tests/unit/util/FakePagingManager.java      | 12 +--
 12 files changed, 232 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
new file mode 100644
index 0000000..f1f53ce
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.runnables;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+public abstract class AtomicRunnable implements Runnable {
+
+
+   public static Runnable checkAtomic(Runnable run) {
+      if (run instanceof AtomicRunnable) {
+         return run;
+      } else {
+         return new AtomicRunnableWithDelegate(run);
+      }
+   }
+
+   private volatile int ran;
+
+   private static final AtomicIntegerFieldUpdater<AtomicRunnable> RAN_UPDATE =
+      AtomicIntegerFieldUpdater.newUpdater(AtomicRunnable.class, "ran");
+
+   @Override
+   public void run() {
+      if (RAN_UPDATE.compareAndSet(this, 0, 1)) {
+         atomicRun();
+      }
+   }
+
+   public abstract void atomicRun();
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java
new file mode 100644
index 0000000..d1583de
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.runnables;
+
+public class AtomicRunnableWithDelegate extends AtomicRunnable {
+
+   private final Runnable runnable;
+
+   public AtomicRunnableWithDelegate(Runnable runnable) {
+      this.runnable = runnable;
+   }
+
+   @Override
+   public void atomicRun() {
+      runnable.run();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 1f5ccbc..86c0687 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -581,9 +581,7 @@ public class AMQPSessionCallback implements SessionCallback {
          Runnable creditRunnable = () -> {
             connection.lock();
             try {
-               if (receiver.getRemoteCredit() <= threshold) {
-                  receiver.flow(credits);
-               }
+               receiver.flow(credits);
             } finally {
                connection.unlock();
             }
@@ -592,10 +590,10 @@ public class AMQPSessionCallback implements SessionCallback {
 
          if (address == null) {
             pagingManager.checkMemory(creditRunnable);
-            return;
+         } else {
+            final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
+            store.checkMemory(creditRunnable);
          }
-         final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
-         store.checkMemory(creditRunnable);
       } catch (Exception e) {
          throw new RuntimeException(e);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
index c8eb2ec..5d8461e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
@@ -17,9 +17,6 @@
 package org.apache.activemq.artemis.core.paging;
 
 import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@@ -82,7 +79,7 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
 
    void resumeCleanup();
 
-   void addBlockedStore(Blockable store);
+   void addBlockedStore(PagingStore store);
 
    void injectMonitor(FileStoreMonitor monitor) throws Exception;
 
@@ -114,54 +111,10 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
       return 0;
    }
 
-   boolean checkMemory(Runnable runnable);
-
-   // To be used when the memory is oversized either by local settings or global settings
on blocking addresses
-   final class OverSizedRunnable implements Runnable {
-
-      private final AtomicBoolean ran = new AtomicBoolean(false);
-
-      private final Runnable runnable;
-
-      public OverSizedRunnable(final Runnable runnable) {
-         this.runnable = runnable;
-      }
-
-      @Override
-      public void run() {
-         if (ran.compareAndSet(false, true)) {
-            runnable.run();
-         }
-      }
-   }
-
-   interface Blockable {
-      /**
-       * It will return true if the destination is leaving blocking.
-       */
-      boolean checkReleasedMemory();
-   }
-
-   final class MemoryFreedRunnablesExecutor implements Runnable {
-
-      private final Queue<OverSizedRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>();
-
-      public void addRunnable(PagingManager.OverSizedRunnable runnable) {
-         onMemoryFreedRunnables.add(runnable);
-      }
-
-      @Override
-      public void run() {
-         Runnable runnable;
-
-         while ((runnable = onMemoryFreedRunnables.poll()) != null) {
-            runnable.run();
-         }
-      }
-
-      public boolean isEmpty() {
-         return onMemoryFreedRunnables.isEmpty();
-      }
-   }
+   /**
+    * Use this when you have no refernce of an address. (anonymous AMQP Producers for example)
+    * @param runWhenAvailable
+    */
+   void checkMemory(Runnable runWhenAvailable);
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 27e8c0f..4dd8bf8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -42,7 +42,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
  *
  * @see PagingManager
  */
-public interface PagingStore extends ActiveMQComponent, RefCountMessageListener, PagingManager.Blockable
{
+public interface PagingStore extends ActiveMQComponent, RefCountMessageListener {
 
    SimpleString getAddress();
 
@@ -132,6 +132,11 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener,
    boolean isRejectingMessages();
 
    /**
+    * It will return true if the destination is leaving blocking.
+    */
+   boolean checkReleasedMemory();
+
+   /**
     * Write lock the PagingStore.
     *
     * @param timeout milliseconds to wait for the lock. If value is {@literal -1} then wait

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index 878f918..8893984 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -20,8 +20,10 @@ import java.nio.file.FileStore;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -39,6 +41,7 @@ import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
 import org.jboss.logging.Logger;
 
 public final class PagingManagerImpl implements PagingManager {
@@ -57,7 +60,7 @@ public final class PagingManagerImpl implements PagingManager {
     */
    private final ReentrantReadWriteLock syncLock = new ReentrantReadWriteLock();
 
-   private final Set<Blockable> blockedStored = new ConcurrentHashSet<>();
+   private final Set<PagingStore> blockedStored = new ConcurrentHashSet<>();
 
    private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<>();
 
@@ -75,13 +78,14 @@ public final class PagingManagerImpl implements PagingManager {
 
    private volatile boolean diskFull = false;
 
+   private final Executor memoryExecutor;
+
+   private final Queue<Runnable> memoryCallback = new ConcurrentLinkedQueue<>();
+
    private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions
= new ConcurrentHashMap<>();
 
    private ActiveMQScheduledComponent scheduledComponent = null;
 
-   private final PagingManager.MemoryFreedRunnablesExecutor memoryFreedRunnablesExecutor
= new PagingManager.MemoryFreedRunnablesExecutor();
-
-   private final Executor executor;
    // Static
    // --------------------------------------------------------------------------------------------------------------------------
 
@@ -106,7 +110,7 @@ public final class PagingManagerImpl implements PagingManager {
       this.addressSettingsRepository = addressSettingsRepository;
       addressSettingsRepository.registerListener(this);
       this.maxSize = maxSize;
-      executor = pagingStoreFactory.newExecutor();
+      this.memoryExecutor = pagingSPI.newExecutor();
    }
 
    public PagingManagerImpl(final PagingStoreFactory pagingSPI,
@@ -115,7 +119,7 @@ public final class PagingManagerImpl implements PagingManager {
    }
 
    @Override
-   public void addBlockedStore(Blockable store) {
+   public void addBlockedStore(PagingStore store) {
       blockedStored.add(store);
    }
 
@@ -157,42 +161,18 @@ public final class PagingManagerImpl implements PagingManager {
       return globalSizeBytes.get();
    }
 
-   @Override
-   public boolean checkMemory(final Runnable runWhenAvailable) {
-      if (isGlobalFull()) {
-         OverSizedRunnable ourRunnable = new OverSizedRunnable(runWhenAvailable);
-
-         memoryFreedRunnablesExecutor.addRunnable(ourRunnable);
-         addBlockedStore(() -> {
-            if (!isGlobalFull()) {
-               if (!memoryFreedRunnablesExecutor.isEmpty()) {
-                  executor.execute(memoryFreedRunnablesExecutor);
-                  ActiveMQServerLogger.LOGGER.unblockingGlobalMessageProduction(getGlobalSize());
-                  return true;
-               }
-            }
-            return false;
-         });
-
-         if (isDiskFull()) {
-            ActiveMQServerLogger.LOGGER.blockingGlobalDiskFull();
-         } else {
-            ActiveMQServerLogger.LOGGER.blockingGlobalMessageProduction(getGlobalSize());
-         }
-
-         return true;
-      }
-
-      runWhenAvailable.run();
-
-      return true;
-   }
-
    protected void checkMemoryRelease() {
       if (!diskFull && (maxSize < 0 || globalSizeBytes.get() < maxSize) &&
!blockedStored.isEmpty()) {
-         Iterator<Blockable> storeIterator = blockedStored.iterator();
+         if (!memoryCallback.isEmpty()) {
+            if (memoryExecutor != null) {
+               memoryExecutor.execute(this::memoryReleased);
+            } else {
+               memoryReleased();
+            }
+         }
+         Iterator<PagingStore> storeIterator = blockedStored.iterator();
          while (storeIterator.hasNext()) {
-            Blockable store = storeIterator.next();
+            PagingStore store = storeIterator.next();
             if (store.checkReleasedMemory()) {
                storeIterator.remove();
             }
@@ -223,7 +203,7 @@ public final class PagingManagerImpl implements PagingManager {
 
       @Override
       public void under(FileStore store, double usage) {
-         if (diskFull) {
+         if (diskFull || !blockedStored.isEmpty() || !memoryCallback.isEmpty()) {
             ActiveMQServerLogger.LOGGER.diskCapacityRestored();
             diskFull = false;
             checkMemoryRelease();
@@ -242,6 +222,27 @@ public final class PagingManagerImpl implements PagingManager {
    }
 
    @Override
+   public void checkMemory(final Runnable runWhenAvailable) {
+
+      if (isGlobalFull()) {
+         memoryCallback.add(AtomicRunnable.checkAtomic(runWhenAvailable));
+         return;
+      }
+      runWhenAvailable.run();
+   }
+
+
+   private void memoryReleased() {
+      Runnable runnable;
+
+      while ((runnable = memoryCallback.poll()) != null) {
+         runnable.run();
+      }
+   }
+
+
+
+   @Override
    public boolean isGlobalFull() {
       return diskFull || maxSize > 0 && globalSizeBytes.get() > maxSize;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 74212ce..5f0d3c8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -23,7 +23,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -60,6 +62,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperation;
 import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import org.apache.activemq.artemis.utils.FutureLatch;
 import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
 import org.jboss.logging.Logger;
 
 /**
@@ -639,7 +642,16 @@ public class PagingStoreImpl implements PagingStore {
 
    }
 
-   private final PagingManager.MemoryFreedRunnablesExecutor memoryFreedRunnablesExecutor
= new PagingManager.MemoryFreedRunnablesExecutor();
+   private final Queue<Runnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>();
+
+   private void memoryReleased() {
+      Runnable runnable;
+
+      while ((runnable = onMemoryFreedRunnables.poll()) != null) {
+         runnable.run();
+      }
+   }
+
 
    @Override
    public boolean checkMemory(final Runnable runWhenAvailable) {
@@ -650,9 +662,8 @@ public class PagingStoreImpl implements PagingStore {
          }
       } else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK
&& (maxSize != -1 || usingGlobalMaxSize)) {
          if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() >
maxSize || pagingManager.isGlobalFull()) {
-            PagingManager.OverSizedRunnable ourRunnable = new PagingManager.OverSizedRunnable(runWhenAvailable);
 
-            memoryFreedRunnablesExecutor.addRunnable(ourRunnable);
+            onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
 
             // We check again to avoid a race condition where the size can come down just
after the element
             // has been added, but the check to execute was done before the element was added
@@ -660,7 +671,7 @@ public class PagingStoreImpl implements PagingStore {
             // MUCH better performance in a highly concurrent environment
             if (!pagingManager.isGlobalFull() && (sizeInBytes.get() <= maxSize
|| maxSize < 0)) {
                // run it now
-               ourRunnable.run();
+               runWhenAvailable.run();
             } else {
                if (usingGlobalMaxSize || pagingManager.isDiskFull()) {
                   pagingManager.addBlockedStore(this);
@@ -719,8 +730,8 @@ public class PagingStoreImpl implements PagingStore {
 
    public boolean checkReleaseMemory(boolean globalOversized, long newSize) {
       if (!globalOversized && (newSize <= maxSize || maxSize < 0)) {
-         if (!memoryFreedRunnablesExecutor.isEmpty()) {
-            executor.execute(memoryFreedRunnablesExecutor);
+         if (!onMemoryFreedRunnables.isEmpty()) {
+            executor.execute(this::memoryReleased);
             if (blocking.get()) {
                ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, sizeInBytes.get(),
maxSize);
                blocking.set(false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 96fffe5..b10d652 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1950,17 +1950,4 @@ public interface ActiveMQServerLogger extends BasicLogger {
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224095, value = "Error updating Consumer Count: {0}", format = Message.Format.MESSAGE_FORMAT)
    void consumerCountError(String reason);
-
-   @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 224096, value = "Disk Full! Blocking message production. Clients will report
blocked.", format = Message.Format.MESSAGE_FORMAT)
-   void blockingGlobalDiskFull();
-
-   @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 224097, value = "Blocking message production; size is currently: {0} bytes;",
format = Message.Format.MESSAGE_FORMAT)
-   void blockingGlobalMessageProduction(long globalSize);
-
-   @LogMessage(level = Logger.Level.INFO)
-   @Message(id = 224098, value = "Unblocking message production; size is currently: {0} bytes;",
format = Message.Format.MESSAGE_FORMAT)
-   void unblockingGlobalMessageProduction(long globalSize);
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
index 957661c..ad59117 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
@@ -150,14 +150,11 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
 
    public interface Callback {
 
-      default void tick(FileStore store, double usage) {
-      }
+      void tick(FileStore store, double usage);
 
-      default void over(FileStore store, double usage) {
-      }
+      void over(FileStore store, double usage);
 
-      default void under(FileStore store, double usage) {
-      }
+      void under(FileStore store, double usage);
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
index e4f27c3..b91d3de 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
@@ -137,6 +137,16 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
             System.out.println("TickS::" + usage);
             latch.countDown();
          }
+
+         @Override
+         public void over(FileStore store, double usage) {
+
+         }
+
+         @Override
+         public void under(FileStore store, double usage) {
+
+         }
       });
       storeMonitor.start();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
index d664013..0e0f86d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
@@ -25,6 +25,7 @@ import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.net.URI;
@@ -45,6 +46,11 @@ public class GlobalDiskFullTest extends AmqpClientTestSupport {
       FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0);
       final CountDownLatch latch = new CountDownLatch(1);
       monitor.addCallback(new FileStoreMonitor.Callback() {
+
+         @Override
+         public void tick(FileStore store, double usage) {
+         }
+
          @Override
          public void over(FileStore store, double usage) {
             latch.countDown();
@@ -53,7 +59,8 @@ public class GlobalDiskFullTest extends AmqpClientTestSupport {
          public void under(FileStore store, double usage) {
          }
       });
-      latch.await(2, TimeUnit.SECONDS);
+
+      Assert.assertTrue(latch.await(1, TimeUnit.MINUTES));
 
       AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + AMQP_PORT));
       AmqpConnection connection = addConnection(client.connect());
@@ -61,27 +68,65 @@ public class GlobalDiskFullTest extends AmqpClientTestSupport {
       try {
          AmqpSession session = connection.createSession();
          AmqpSender sender = session.createSender(getQueueName());
-         final AmqpMessage message = new AmqpMessage();
          byte[] payload = new byte[1000];
-         message.setBytes(payload);
-
-         sender.setSendTimeout(1000);
-         sender.send(message);
 
-         org.apache.activemq.artemis.core.server.Queue queueView = getProxyToQueue(getQueueName());
-         assertEquals("shouldn't receive any messages", 0, queueView.getMessageCount());
 
          AmqpSender anonSender = session.createSender();
-         final AmqpMessage message1 = new AmqpMessage();
-         message1.setBytes(payload);
-         message1.setAddress(getQueueName());
 
-         anonSender.setSendTimeout(1000);
-         anonSender.send(message1);
+         CountDownLatch sentWithName = new CountDownLatch(1);
+         CountDownLatch sentAnon = new CountDownLatch(1);
+
+         Thread threadWithName = new Thread() {
+            @Override
+            public void run() {
+
+               try {
+                  final AmqpMessage message = new AmqpMessage();
+                  message.setBytes(payload);
+                  sender.setSendTimeout(-1);
+                  sender.send(message);
+               } catch (Exception e) {
+                  e.printStackTrace();
+               } finally {
+                  sentWithName.countDown();
+               }
+            }
+         };
+
+         threadWithName.start();
+
+
+         Thread threadWithAnon = new Thread() {
+            @Override
+            public void run() {
+               try {
+                  final AmqpMessage message = new AmqpMessage();
+                  message.setBytes(payload);
+                  anonSender.setSendTimeout(-1);
+                  message.setAddress(getQueueName());
+                  anonSender.send(message);
+               } catch (Exception e) {
+                  e.printStackTrace();
+               } finally {
+                  sentAnon.countDown();
+               }
+            }
+         };
+
+         threadWithAnon.start();
+
+         Assert.assertFalse("Thread sender should be blocked", sentWithName.await(500, TimeUnit.MILLISECONDS));
+         Assert.assertFalse("Thread sender anonymous should be blocked", sentAnon.await(500,
TimeUnit.MILLISECONDS));
+
+         monitor.setMaxUsage(100.0);
 
-         queueView = getProxyToQueue(getQueueName());
-         assertEquals("shouldn't receive any messages", 0, queueView.getMessageCount());
+         Assert.assertTrue("Thread sender should be released", sentWithName.await(30, TimeUnit.SECONDS));
+         Assert.assertTrue("Thread sender anonymous should be released", sentAnon.await(30,
TimeUnit.SECONDS));
 
+         threadWithName.join(TimeUnit.SECONDS.toMillis(30));
+         threadWithAnon.join(TimeUnit.SECONDS.toMillis(30));
+         Assert.assertFalse(threadWithName.isAlive());
+         Assert.assertFalse(threadWithAnon.isAlive());
       } finally {
          connection.close();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
index 3431655..94a9d79 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
@@ -30,7 +30,12 @@ import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 public final class FakePagingManager implements PagingManager {
 
    @Override
-   public void addBlockedStore(Blockable store) {
+   public void addBlockedStore(PagingStore store) {
+
+   }
+
+   @Override
+   public void checkMemory(Runnable runWhenAvailable) {
 
    }
 
@@ -115,11 +120,6 @@ public final class FakePagingManager implements PagingManager {
       return false;
    }
 
-   @Override
-   public boolean checkMemory(Runnable runnable) {
-      return false;
-   }
-
    /*
     * (non-Javadoc)
     * @see org.apache.activemq.artemis.core.paging.PagingManager#isGlobalFull()


Mime
View raw message