activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [7/7] activemq-artemis git commit: ARTEMIS-581 Implement max disk usage, and global-max-size
Date Tue, 06 Sep 2016 14:08:09 GMT
ARTEMIS-581 Implement max disk usage, and global-max-size

max-disk-usage = how much of a disk we can use before the system blocks
global-max-size = how much bytes we can take from memory for messages before we start enter into the configured page mode

This will also change the default created configuration into page-mode as that's more reliable for systems.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4472aa0e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4472aa0e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4472aa0e

Branch: refs/heads/master
Commit: 4472aa0e36b73b776fc173816b13f9d72d3c45b1
Parents: bfc2095
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Fri Sep 2 16:30:44 2016 -0400
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Tue Sep 6 15:07:49 2016 +0100

----------------------------------------------------------------------
 .../artemis/cli/commands/etc/broker.xml         |   16 +-
 .../config/ActiveMQDefaultConfiguration.java    |   19 +
 .../protocol/AbstractRemotingConnection.java    |    4 +
 .../spi/core/protocol/RemotingConnection.java   |    8 +
 .../core/protocol/mqtt/MQTTConnection.java      |    5 +
 .../core/protocol/stomp/StompConnection.java    |    4 +
 .../artemis/core/config/Configuration.java      |   16 +-
 .../core/config/impl/ConfigurationImpl.java     |   39 +
 .../deployers/impl/FileConfigurationParser.java |   12 +
 .../artemis/core/paging/PagingManager.java      |   16 +
 .../artemis/core/paging/PagingStore.java        |    3 +
 .../artemis/core/paging/PagingStoreFactory.java |    3 +
 .../artemis/core/paging/impl/PageSyncTimer.java |    2 +
 .../core/paging/impl/PagingManagerImpl.java     |   93 +-
 .../core/paging/impl/PagingStoreFactoryNIO.java |    8 +-
 .../core/paging/impl/PagingStoreImpl.java       |   95 +-
 .../core/persistence/StorageManager.java        |    4 +
 .../impl/journal/JournalStorageManager.java     |   12 +-
 .../impl/nullpm/NullStorageManager.java         |    6 +
 .../core/server/ActiveMQMessageBundle.java      |    5 +
 .../core/server/ActiveMQScheduledComponent.java |  101 +
 .../core/server/ActiveMQServerLogger.java       |   10 +
 .../core/server/files/FileMoveManager.java      |  218 +
 .../core/server/files/FileStoreMonitor.java     |  127 +
 .../core/server/impl/ActiveMQServerImpl.java    |   39 +-
 .../core/server/impl/FileMoveManager.java       |  218 -
 .../core/server/impl/ServerSessionImpl.java     |   17 +-
 .../core/server/reload/ReloadManagerImpl.java   |   50 +-
 .../resources/schema/artemis-configuration.xsd  |   24 +
 .../core/config/impl/FileConfigurationTest.java |    3 +
 .../core/server/files/FileMoveManagerTest.java  |  346 ++
 .../core/server/files/FileStoreMonitorTest.java |  161 +
 .../core/server/impl/FileMoveManagerTest.java   |  346 --
 .../transaction/impl/TransactionImplTest.java   |    6 +
 .../artemis/tests/util/ActiveMQTestBase.java    |   20 +-
 .../resources/ConfigurationTest-full-config.xml |    3 +
 docs/user-manual/en/configuration-index.md      |    3 +
 docs/user-manual/en/paging.md                   |   15 +-
 .../integration/client/HangConsumerTest.java    |    2 +-
 .../integration/client/LargeMessageTest.java    |    2 +-
 .../integration/client/PagingOrderTest.java     |  714 ---
 .../integration/client/PagingSyncTest.java      |  109 -
 .../tests/integration/client/PagingTest.java    | 5734 ------------------
 .../cluster/failover/BackupSyncJournalTest.java |    2 +-
 .../cluster/failover/FailoverTest.java          |    2 +-
 .../paging/AddressFullLoggingTest.java          |  152 +
 .../integration/paging/GlobalPagingTest.java    |  183 +
 .../integration/paging/PagingOrderTest.java     |  714 +++
 .../integration/paging/PagingSyncTest.java      |  109 +
 .../tests/integration/paging/PagingTest.java    | 5731 +++++++++++++++++
 .../server/AddressFullLoggingTest.java          |  131 -
 .../tests/integration/stomp/StompTest.java      |   42 +
 .../tests/integration/stomp/StompTestBase.java  |    2 +-
 .../storage/PersistMultiThreadTest.java         |    5 +
 .../core/paging/impl/PagingStoreImplTest.java   |    6 +
 .../tests/unit/util/FakePagingManager.java      |   54 +-
 56 files changed, 8383 insertions(+), 7388 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
index 520a231..fe28246 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
@@ -50,6 +50,17 @@ under the License.
       <journal-pool-files>-1</journal-pool-files>
 ${journal-buffer.settings}
 ${connector-config.settings}
+      <!-- how often we are looking for how many bytes are being used on the disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <!-- the system will enter into page mode once you hit this limit.
+           This is an estimate in bytes of how much the messages are using in memory -->
+      <global-max-size>104857600</global-max-size>
+
       <acceptors>
          <!-- Default ActiveMQ Artemis Acceptor.  Multi-protocol adapter.  Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
          <!-- performance tests have shown that openWire performs best with these buffer sizes -->
@@ -78,9 +89,10 @@ ${cluster-security.settings}${cluster.settings}${replicated.settings}${shared-st
             <dead-letter-address>jms.queue.DLQ</dead-letter-address>
             <expiry-address>jms.queue.ExpiryQueue</expiry-address>
             <redelivery-delay>0</redelivery-delay>
-            <max-size-bytes>10485760</max-size-bytes>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
             <message-counter-history-day-limit>10</message-counter-history-day-limit>
-            <address-full-policy>BLOCK</address-full-policy>
+            <address-full-policy>PAGE</address-full-policy>
          </address-setting>
       </address-settings>
    </core>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index aecf902..4ba7329 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -429,6 +429,12 @@ public final class ActiveMQDefaultConfiguration {
    // Default period to wait between configuration file checks
    public static final long DEFAULT_CONFIGURATION_FILE_REFRESH_PERIOD = 5000;
 
+   public static final long DEFAULT_GLOBAL_MAX_SIZE = -1;
+
+   public static final int DEFAULT_MAX_DISK_USAGE = 100;
+
+   public static final int DEFAULT_DISK_SCAN = 5000;
+
    /**
     * If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
     */
@@ -1144,4 +1150,17 @@ public final class ActiveMQDefaultConfiguration {
    public static long getDefaultConfigurationFileRefreshPeriod() {
       return DEFAULT_CONFIGURATION_FILE_REFRESH_PERIOD;
    }
+
+   /** The default global max size. -1 = no global max size. */
+   public static long getDefaultMaxGlobalSize() {
+      return DEFAULT_GLOBAL_MAX_SIZE;
+   }
+
+   public static int getDefaultMaxDiskUsage() {
+      return DEFAULT_MAX_DISK_USAGE;
+   }
+
+   public static int getDefaultDiskScanPeriod() {
+      return DEFAULT_DISK_SCAN;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
index c438766..e512adf 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
@@ -219,4 +219,8 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
       dataReceived = true;
    }
 
+   @Override
+   public boolean isSupportsFlowControl() {
+      return true;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
index 0f5abf3..253986f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
@@ -196,4 +196,12 @@ public interface RemotingConnection extends BufferHandler {
     * @return
     */
    boolean isSupportReconnect();
+
+   /**
+    * Return true if the protocol supports flow control.
+    * This is because in some cases we may need to hold message producers in cases like disk full.
+    * If the protocol doesn't support it we trash the connection and throw exceptions.
+    * @return
+    */
+   boolean isSupportsFlowControl();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
index aa87bd8..31486ff 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
@@ -221,4 +221,9 @@ public class MQTTConnection implements RemotingConnection {
    public boolean isSupportReconnect() {
       return false;
    }
+
+   @Override
+   public boolean isSupportsFlowControl() {
+      return false;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 36f440c..b918b75 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -738,4 +738,8 @@ public final class StompConnection implements RemotingConnection {
       //unsupported
    }
 
+   @Override
+   public boolean isSupportsFlowControl() {
+      return false;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index fc38155..e78479a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -561,10 +561,10 @@ public interface Configuration {
     */
    Configuration setJournalCompactMinFiles(int minFiles);
 
-   /** Number of files that would be acceptable to keep on a pool. Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_POOL_SIZE}.*/
+   /** Number of files that would be acceptable to keep on a pool. Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_POOL_FILES}.*/
    int getJournalPoolFiles();
 
-   /** Number of files that would be acceptable to keep on a pool. Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_POOL_SIZE}.*/
+   /** Number of files that would be acceptable to keep on a pool. Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_POOL_FILES}.*/
    Configuration setJournalPoolFiles(int poolSize);
 
 
@@ -968,4 +968,16 @@ public interface Configuration {
 
    Configuration setConfigurationFileRefreshPeriod(long configurationFileRefreshPeriod);
 
+   long getGlobalMaxSize();
+
+   Configuration setGlobalMaxSize(long globalMaxSize);
+
+   int getMaxDiskUsage();
+
+   Configuration setMaxDiskUsage(int maxDiskUsage);
+
+   Configuration setDiskScanPeriod(int diskScanPeriod);
+
+   int getDiskScanPeriod();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 0604c44..51b633f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -246,6 +246,12 @@ public class ConfigurationImpl implements Configuration, Serializable {
 
    private long configurationFileRefreshPeriod = ActiveMQDefaultConfiguration.getDefaultConfigurationFileRefreshPeriod();
 
+   private long globalMaxSize = ActiveMQDefaultConfiguration.getDefaultMaxGlobalSize();
+
+   private int maxDiskUsage = ActiveMQDefaultConfiguration.getDefaultMaxDiskUsage();
+
+   private int diskScanPeriod = ActiveMQDefaultConfiguration.getDefaultDiskScanPeriod();
+
    /**
     * Parent folder for all data folders.
     */
@@ -264,6 +270,28 @@ public class ConfigurationImpl implements Configuration, Serializable {
    }
 
    @Override
+   public int getMaxDiskUsage() {
+      return maxDiskUsage;
+   }
+
+   @Override
+   public ConfigurationImpl setMaxDiskUsage(int maxDiskUsage) {
+      this.maxDiskUsage = maxDiskUsage;
+      return this;
+   }
+
+   @Override
+   public ConfigurationImpl setGlobalMaxSize(long maxSize) {
+      this.globalMaxSize = maxSize;
+      return this;
+   }
+
+   @Override
+   public long getGlobalMaxSize() {
+      return globalMaxSize;
+   }
+
+   @Override
    public ConfigurationImpl setPersistenceEnabled(final boolean enable) {
       persistenceEnabled = enable;
       return this;
@@ -1784,6 +1812,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
       return this;
    }
 
+   @Override
+   public int getDiskScanPeriod() {
+      return diskScanPeriod;
+   }
+
+   @Override
+   public ConfigurationImpl setDiskScanPeriod(int diskScanPeriod) {
+      this.diskScanPeriod = diskScanPeriod;
+      return this;
+   }
+
    /**
     * It will find the right location of a subFolder, related to artemisInstance
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index c77157b..f700e32 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -175,6 +175,12 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
    private static final String MAX_QUEUES_NODE_NAME = "max-queues";
 
+   private static final String GLOBAL_MAX_SIZE = "global-max-size";
+
+   private static final String MAX_DISK_USAGE = "max-disk-usage";
+
+   private static final String DISK_SCAN_PERIOD = "disk-scan-period";
+
    // Attributes ----------------------------------------------------
 
    private boolean validateAIO = false;
@@ -282,6 +288,12 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
       config.setConfigurationFileRefreshPeriod(getLong(e, "configuration-file-refresh-period", config.getConfigurationFileRefreshPeriod(), Validators.GT_ZERO));
 
+      config.setGlobalMaxSize(getLong(e, GLOBAL_MAX_SIZE, config.getGlobalMaxSize(), Validators.MINUS_ONE_OR_GT_ZERO));
+
+      config.setMaxDiskUsage(getInteger(e, MAX_DISK_USAGE, config.getMaxDiskUsage(), Validators.PERCENTAGE));
+
+      config.setDiskScanPeriod(getInteger(e, DISK_SCAN_PERIOD, config.getDiskScanPeriod(), Validators.MINUS_ONE_OR_GT_ZERO));
+
       // parsing cluster password
       String passwordText = getString(e, "cluster-password", null, Validators.NO_CHECK);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
index d55c09f..b70626a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
@@ -20,6 +20,7 @@ import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeListener;
 
 /**
@@ -78,6 +79,10 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
 
    void resumeCleanup();
 
+   void addBlockedStore(PagingStore store);
+
+   void injectMonitor(FileStoreMonitor monitor) throws Exception;
+
    /**
     * Lock the manager. This method should not be called during normal PagingManager usage.
     */
@@ -89,4 +94,15 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
     * @see #lock()
     */
    void unlock();
+
+   /** Add size at the global count level.
+    *  if totalSize > globalMaxSize it will return true */
+   PagingManager addSize(int size);
+
+   boolean isUsingGlobalSize();
+
+   boolean isGlobalFull();
+
+   boolean isDiskFull();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 79fb115..a4a41ef 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -130,6 +130,9 @@ public interface PagingStore extends ActiveMQComponent {
 
    boolean isRejectingMessages();
 
+   /** It will return true if the destination is leaving blocking. */
+   boolean checkReleasedMemory();
+
    /**
     * Write lock the PagingStore.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
index 7c52c63..942ff80 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 
@@ -43,4 +44,6 @@ public interface PagingStoreFactory {
 
    SequentialFileFactory newFileFactory(SimpleString address) throws Exception;
 
+   void injectMonitor(FileStoreMonitor monitor) throws Exception;
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
index a595721..bf90750 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java
@@ -67,6 +67,8 @@ final class PageSyncTimer {
       ctx.pageSyncLineUp();
       if (!pendingSync) {
          pendingSync = true;
+
+         // this is a single event
          scheduledExecutor.schedule(runnable, timeSync, TimeUnit.NANOSECONDS);
       }
       syncOperations.add(ctx);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index 3c76059..a657a22 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -16,11 +16,14 @@
  */
 package org.apache.activemq.artemis.core.paging.impl;
 
+import java.nio.file.FileStore;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -28,8 +31,11 @@ import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.jboss.logging.Logger;
 
 public final class PagingManagerImpl implements PagingManager {
@@ -46,14 +52,22 @@ public final class PagingManagerImpl implements PagingManager {
     */
    private final ReentrantReadWriteLock syncLock = new ReentrantReadWriteLock();
 
+   private final Set<PagingStore> blockedStored = new ConcurrentHashSet<>();
+
    private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<>();
 
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
 
    private final PagingStoreFactory pagingStoreFactory;
 
+   private final AtomicLong globalSizeBytes = new AtomicLong(0);
+
+   private final long maxSize;
+
    private volatile boolean cleanupEnabled = true;
 
+   private volatile boolean diskFull = false;
+
    private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<>();
 
    // Static
@@ -63,10 +77,21 @@ public final class PagingManagerImpl implements PagingManager {
    // --------------------------------------------------------------------------------------------------------------------
 
    public PagingManagerImpl(final PagingStoreFactory pagingSPI,
-                            final HierarchicalRepository<AddressSettings> addressSettingsRepository) {
+                            final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+                            final long maxSize) {
       pagingStoreFactory = pagingSPI;
       this.addressSettingsRepository = addressSettingsRepository;
       addressSettingsRepository.registerListener(this);
+      this.maxSize = maxSize;
+   }
+
+   public PagingManagerImpl(final PagingStoreFactory pagingSPI,
+                            final HierarchicalRepository<AddressSettings> addressSettingsRepository) {
+      this(pagingSPI, addressSettingsRepository, -1);
+   }
+
+   public void addBlockedStore(PagingStore store) {
+      blockedStored.add(store);
    }
 
    @Override
@@ -82,6 +107,72 @@ public final class PagingManagerImpl implements PagingManager {
    }
 
    @Override
+   public PagingManagerImpl addSize(int size) {
+      globalSizeBytes.addAndGet(size);
+
+      if (size < 0) {
+         checkMemoryRelease();
+      }
+      return this;
+   }
+
+   protected void checkMemoryRelease() {
+      if (!diskFull && (maxSize < 0 || globalSizeBytes.get() < maxSize) && !blockedStored.isEmpty()) {
+         Iterator<PagingStore> storeIterator = blockedStored.iterator();
+         while (storeIterator.hasNext()) {
+            PagingStore store = storeIterator.next();
+            if (store.checkReleasedMemory()) {
+               storeIterator.remove();
+            }
+         }
+      }
+   }
+
+   @Override
+   public void injectMonitor(FileStoreMonitor monitor) throws Exception {
+      pagingStoreFactory.injectMonitor(monitor);
+      monitor.addCallback(new LocalMonitor());
+   }
+
+   class LocalMonitor implements FileStoreMonitor.Callback {
+
+      @Override
+      public void tick(FileStore store, double usage) {
+         logger.tracef("Tick from store:: %s, usage at %f", store, usage);
+      }
+
+      @Override
+      public void over(FileStore store, double usage) {
+         if (!diskFull) {
+            ActiveMQServerLogger.LOGGER.diskBeyondCapacity();
+            diskFull = true;
+         }
+      }
+
+      @Override
+      public void under(FileStore store, double usage) {
+         if (diskFull) {
+            ActiveMQServerLogger.LOGGER.diskCapacityRestored();
+            diskFull = false;
+            checkMemoryRelease();
+         }
+      }
+   }
+
+   @Override
+   public boolean isDiskFull() {
+      return diskFull;
+   }
+
+   public boolean isUsingGlobalSize() {
+      return maxSize > 0;
+   }
+
+   public boolean isGlobalFull() {
+      return diskFull || maxSize > 0 && globalSizeBytes.get() > maxSize;
+   }
+
+   @Override
    public void disableCleanup() {
       if (!cleanupEnabled) {
          return;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
index c95f214..5093033 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
@@ -40,7 +40,8 @@ import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
 import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.impl.FileMoveManager;
+import org.apache.activemq.artemis.core.server.files.FileMoveManager;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
@@ -96,6 +97,11 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
    }
 
    @Override
+   public void injectMonitor(FileStoreMonitor monitor) throws Exception {
+      monitor.addStore(this.directory);
+   }
+
+   @Override
    public PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, Executor executor) {
       return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 7e6cda8..356ea45 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -98,6 +98,8 @@ public class PagingStoreImpl implements PagingStore {
 
    private final PagingManager pagingManager;
 
+   private final boolean usingGlobalMaxSize;
+
    private final Executor executor;
 
    // Bytes consumed by the queue on the memory
@@ -176,6 +178,7 @@ public class PagingStoreImpl implements PagingStore {
 
       this.cursorProvider = storeFactory.newCursorProvider(this, this.storageManager, addressSettings, executor);
 
+      this.usingGlobalMaxSize = pagingManager.isUsingGlobalSize();
    }
 
    /**
@@ -242,7 +245,13 @@ public class PagingStoreImpl implements PagingStore {
 
    @Override
    public long getMaxSize() {
-      return maxSize;
+      if (maxSize < 0) {
+         // if maxSize < 0, we will return 2 pages for depage purposes
+         return pageSize * 2;
+      }
+      else {
+         return maxSize;
+      }
    }
 
    @Override
@@ -626,7 +635,7 @@ public class PagingStoreImpl implements PagingStore {
 
    }
 
-   private final Queue<OurRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>();
+   private final Queue<OverSizedRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>();
 
    private class MemoryFreedRunnablesExecutor implements Runnable {
 
@@ -642,13 +651,14 @@ public class PagingStoreImpl implements PagingStore {
 
    private final Runnable memoryFreedRunnablesExecutor = new MemoryFreedRunnablesExecutor();
 
-   private static final class OurRunnable implements Runnable {
+   // To be used when the memory is oversized either by local settings or global settings on blocking addresses
+   private static final class OverSizedRunnable implements Runnable {
 
       private boolean ran;
 
       private final Runnable runnable;
 
-      private OurRunnable(final Runnable runnable) {
+      private OverSizedRunnable(final Runnable runnable) {
          this.runnable = runnable;
       }
 
@@ -664,9 +674,15 @@ public class PagingStoreImpl implements PagingStore {
 
    @Override
    public boolean checkMemory(final Runnable runWhenAvailable) {
-      if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && maxSize != -1) {
-         if (sizeInBytes.get() > maxSize) {
-            OurRunnable ourRunnable = new OurRunnable(runWhenAvailable);
+
+      if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) {
+         if (isFull()) {
+            return false;
+         }
+      }
+      else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) {
+         if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() > maxSize || pagingManager.isGlobalFull()) {
+            OverSizedRunnable ourRunnable = new OverSizedRunnable(runWhenAvailable);
 
             onMemoryFreedRunnables.add(ourRunnable);
 
@@ -674,23 +690,24 @@ public class PagingStoreImpl implements PagingStore {
             // has been added, but the check to execute was done before the element was added
             // NOTE! We do not fix this race by locking the whole thing, doing this check provides
             // MUCH better performance in a highly concurrent environment
-            if (sizeInBytes.get() <= maxSize) {
+            if (!pagingManager.isGlobalFull() && (sizeInBytes.get() <= maxSize || maxSize < 0)) {
                // run it now
                ourRunnable.run();
             }
-            else if (!blocking.get()) {
-               ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize);
-               blocking.set(true);
+            else {
+               if (usingGlobalMaxSize || pagingManager.isDiskFull()) {
+                  pagingManager.addBlockedStore(this);
+               }
+
+               if (!blocking.get()) {
+                  ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize);
+                  blocking.set(true);
+               }
             }
 
             return true;
          }
       }
-      else if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && maxSize != -1) {
-         if (sizeInBytes.get() > maxSize) {
-            return false;
-         }
-      }
 
       runWhenAvailable.run();
 
@@ -699,40 +716,48 @@ public class PagingStoreImpl implements PagingStore {
 
    @Override
    public void addSize(final int size) {
+
+      boolean globalFull = pagingManager.addSize(size).isGlobalFull();
+      long newSize = sizeInBytes.addAndGet(size);
+
       if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
-         if (maxSize != -1) {
-            long newSize = sizeInBytes.addAndGet(size);
-
-            if (newSize <= maxSize) {
-               if (!onMemoryFreedRunnables.isEmpty()) {
-                  executor.execute(memoryFreedRunnablesExecutor);
-                  if (blocking.get()) {
-                     ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, sizeInBytes.get(), maxSize);
-                     blocking.set(false);
-                  }
-               }
-            }
+         if (usingGlobalMaxSize && !globalFull || maxSize != -1) {
+            checkReleaseMemory(globalFull, newSize);
          }
 
          return;
       }
       else if (addressFullMessagePolicy == AddressFullMessagePolicy.PAGE) {
-         final long addressSize = sizeInBytes.addAndGet(size);
-
          if (size > 0) {
-            if (maxSize > 0 && addressSize > maxSize) {
+            if (maxSize != -1 && newSize > maxSize || globalFull) {
                if (startPaging()) {
-                  ActiveMQServerLogger.LOGGER.pageStoreStart(storeName, addressSize, maxSize);
+                  ActiveMQServerLogger.LOGGER.pageStoreStart(storeName, newSize, maxSize);
                }
             }
          }
 
          return;
       }
-      else if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP || addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
-         sizeInBytes.addAndGet(size);
+   }
+
+   @Override
+   public boolean checkReleasedMemory() {
+      return checkReleaseMemory(pagingManager.isGlobalFull(), sizeInBytes.get());
+   }
+
+   public boolean checkReleaseMemory(boolean globalOversized, long newSize) {
+      if (!globalOversized && (newSize <= maxSize || maxSize < 0)) {
+         if (!onMemoryFreedRunnables.isEmpty()) {
+            executor.execute(memoryFreedRunnablesExecutor);
+            if (blocking.get()) {
+               ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, sizeInBytes.get(), maxSize);
+               blocking.set(false);
+               return true;
+            }
+         }
       }
 
+      return false;
    }
 
    @Override
@@ -1073,7 +1098,7 @@ public class PagingStoreImpl implements PagingStore {
    // To be used on isDropMessagesWhenFull
    @Override
    public boolean isFull() {
-      return maxSize > 0 && getAddressSize() > maxSize;
+      return maxSize > 0 && getAddressSize() > maxSize || pagingManager.isGlobalFull();
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index d08b84d..7dcd9d3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
 import org.apache.activemq.artemis.core.server.impl.JournalLoader;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
@@ -413,4 +414,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
     * {@link org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl}
     */
    void persistIdGenerator();
+
+
+   void injectMonitor(FileStoreMonitor monitor) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index b50e117..d8c28d2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -60,6 +60,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.jboss.logging.Logger;
 
@@ -68,6 +69,8 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
 
    private SequentialFileFactory journalFF;
 
+   private SequentialFileFactory bindingsFF;
+
    SequentialFileFactory largeMessagesFactory;
 
    private Journal originalMessageJournal;
@@ -95,7 +98,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
          throw ActiveMQMessageBundle.BUNDLE.invalidJournal();
       }
 
-      SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
+      bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
 
       Journal localBindings = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1);
 
@@ -726,4 +729,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
          readUnLock();
       }
    }
+
+   @Override
+   public void injectMonitor(FileStoreMonitor monitor) throws Exception {
+      monitor.addStore(journalFF.getDirectory());
+      monitor.addStore(largeMessagesFactory.getDirectory());
+      monitor.addStore(bindingsFF.getDirectory());
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 21a9fd9..f13d2fa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
 import org.apache.activemq.artemis.core.server.impl.JournalLoader;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
@@ -83,6 +84,11 @@ public class NullStorageManager implements StorageManager {
 
    }
 
+   @Override
+   public void injectMonitor(FileStoreMonitor monitor) throws Exception {
+
+   }
+
    private static final OperationContext dummyContext = new OperationContext() {
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index d7b893f..d416a97 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
 import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
 import org.apache.activemq.artemis.api.core.ActiveMQDuplicateMetaDataException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.ActiveMQIncompatibleClientServerException;
 import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
@@ -370,4 +371,8 @@ public interface ActiveMQMessageBundle {
 
    @Message(id = 119118, value = "Management method not applicable for current server configuration")
    IllegalStateException methodNotApplicable();
+
+   @Message(id = 119119, value = "Disk Capacity is Low, cannot produce more messages.")
+   ActiveMQIOErrorException diskBeyondLimit();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
new file mode 100644
index 0000000..dadf171
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.logging.Logger;
+
+/** This is for components with a scheduled at a fixed rate. */
+public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, Runnable {
+
+   private static final Logger logger = Logger.getLogger(ActiveMQScheduledComponent.class);
+   private final ScheduledExecutorService scheduledExecutorService;
+   private long period;
+   private TimeUnit timeUnit;
+   private ScheduledFuture future;
+
+   public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService,
+                                     long checkPeriod,
+                                     TimeUnit timeUnit) {
+      this.scheduledExecutorService = scheduledExecutorService;
+      this.period = checkPeriod;
+      this.timeUnit = timeUnit;
+   }
+
+   @Override
+   public synchronized void start() {
+      if (future != null) {
+         return;
+      }
+      if (period >= 0) {
+         future = scheduledExecutorService.scheduleWithFixedDelay(this, period, period, timeUnit);
+      }
+      else {
+         logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period);
+      }
+   }
+
+   public long getPeriod() {
+      return period;
+   }
+
+   public synchronized ActiveMQScheduledComponent setPeriod(long period) {
+      this.period = period;
+      restartIfNeeded();
+      return this;
+   }
+
+   public TimeUnit getTimeUnit() {
+      return timeUnit;
+   }
+
+   public synchronized ActiveMQScheduledComponent setTimeUnit(TimeUnit timeUnit) {
+      this.timeUnit = timeUnit;
+      restartIfNeeded();
+      return this;
+   }
+
+   @Override
+   public synchronized void stop() {
+      if (future == null) {
+         return; // no big deal
+      }
+
+      future.cancel(false);
+      future = null;
+
+   }
+
+   @Override
+   public synchronized boolean isStarted() {
+      return future != null;
+   }
+
+
+   // this will restart the schedulped component upon changes
+   private void restartIfNeeded() {
+      if (isStarted()) {
+         stop();
+         start();
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 7ff773b..7696c01 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1242,6 +1242,16 @@ public interface ActiveMQServerLogger extends BasicLogger {
       format = Message.Format.MESSAGE_FORMAT)
    void impossibleToRouteGrouped();
 
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222210, value = "Storage usage is beyond max-disk-usage. System will start blocking producers.",
+      format = Message.Format.MESSAGE_FORMAT)
+   void diskBeyondCapacity();
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222211, value = "Storage is back to stable now, under max-disk-usage.",
+      format = Message.Format.MESSAGE_FORMAT)
+   void diskCapacityRestored();
+
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
    void initializationError(@Cause Throwable e);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileMoveManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileMoveManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileMoveManager.java
new file mode 100644
index 0000000..3e250f6
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileMoveManager.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.files;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.jboss.logging.Logger;
+
+/**
+ * Used to move files away.
+ * Each time a backup starts its formeter data will be moved to a backup folder called bkp.1, bkp.2, ... etc
+ * We may control the maximum number of folders so we remove old ones.
+ */
+public class FileMoveManager {
+   private static final Logger logger = Logger.getLogger(FileMoveManager.class);
+
+   private final File folder;
+   private int maxFolders;
+   public static final String PREFIX = "oldreplica.";
+
+   private static final FilenameFilter isPrefix = new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+         boolean prefixed = name.contains(PREFIX);
+
+         if (prefixed) {
+            try {
+               Integer.parseInt(name.substring(PREFIX.length()));
+            }
+            catch (NumberFormatException e) {
+               // This function is not really used a lot
+               // so I don't really mind about performance here
+               // this is good enough for what we need
+               prefixed = false;
+            }
+         }
+
+         return prefixed;
+      }
+   };
+
+   private static final FilenameFilter notPrefix = new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+         return !isPrefix.accept(dir, name);
+      }
+   };
+
+
+   public FileMoveManager(File folder) {
+      this(folder, -1);
+   }
+
+   public FileMoveManager(File folder, int maxFolders) {
+      this.folder = folder;
+      this.maxFolders = maxFolders;
+   }
+
+   public int getMaxFolders() {
+      return maxFolders;
+   }
+
+   public FileMoveManager setMaxFolders(int maxFolders) {
+      this.maxFolders = maxFolders;
+      return this;
+   }
+
+   public void doMove() throws IOException {
+      String[] files = getFiles();
+
+      if (files == null || files.length == 0) {
+         // if no files, nothing to be done, no backup, no deletes... nothing!
+         return;
+      }
+
+      // Since we will create one folder, we are already taking that one into consideration
+      internalCheckOldFolders(1);
+
+      int whereToMove = getMaxID() + 1;
+
+      File folderTo = getFolder(whereToMove);
+      folderTo.mkdirs();
+
+      ActiveMQServerLogger.LOGGER.backupMovingDataAway(folder.getPath(), folderTo.getPath());
+
+      for (String fileMove : files) {
+         File fileFrom = new File(folder, fileMove);
+         File fileTo = new File(folderTo, fileMove);
+         logger.tracef("doMove:: moving %s as %s", fileFrom, fileTo);
+         Files.move(fileFrom.toPath(), fileTo.toPath());
+      }
+
+   }
+
+   public void checkOldFolders() {
+      internalCheckOldFolders(0);
+   }
+
+   private void internalCheckOldFolders(int creating) {
+      if (maxFolders > 0) {
+         int folders = getNumberOfFolders();
+
+         // We are counting the next one to be created
+         int foldersToDelete = folders + creating - maxFolders;
+
+         if (foldersToDelete > 0) {
+            logger.tracef("There are %d folders to delete", foldersToDelete);
+            int[] ids = getIDlist();
+            for (int i = 0; i < foldersToDelete; i++) {
+               File file = getFolder(ids[i]);
+               ActiveMQServerLogger.LOGGER.removingBackupData(file.getPath());
+               deleteTree(file);
+            }
+         }
+      }
+   }
+
+   /**
+    * It will return non backup folders
+    */
+   public String[] getFiles() {
+      return folder.list(notPrefix);
+   }
+
+
+   public int getNumberOfFolders() {
+      return getFolders().length;
+   }
+
+   public String[] getFolders() {
+      String[] list = folder.list(isPrefix);
+
+      if (list == null) {
+         list = new String[0];
+      }
+
+
+      return list;
+   }
+
+
+   public int getMinID() {
+      int[] list = getIDlist();
+
+      if (list.length == 0) {
+         return 0;
+      }
+
+      return list[0];
+   }
+
+   public int getMaxID() {
+      int[] list = getIDlist();
+
+      if (list.length == 0) {
+         return 0;
+      }
+
+      return list[list.length - 1];
+   }
+
+
+   public int[] getIDlist() {
+      String[] list = getFolders();
+      int[] ids = new int[list.length];
+      for (int i = 0; i < ids.length; i++) {
+         ids[i] = getID(list[i]);
+      }
+
+      Arrays.sort(ids);
+
+      return ids;
+   }
+
+   public int getID(String folderName) {
+      return Integer.parseInt(folderName.substring(PREFIX.length()));
+   }
+
+
+   public File getFolder(int id) {
+      return new File(folder, PREFIX + id);
+   }
+
+
+   private void deleteTree(File file) {
+      File[] files = file.listFiles();
+
+      if (files != null) {
+         for (File fileDelete : files) {
+            deleteTree(fileDelete);
+         }
+      }
+
+      file.delete();
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
new file mode 100644
index 0000000..f75f6c6
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.files;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.jboss.logging.Logger;
+
+/** This will keep a list of fileStores. It will make a comparisson on all file stores registered. if any is over the limit,
+ *  all Callbacks will be called with over.
+ *
+ *  For instance: if Large Messages folder is registered on a different folder and it's over capacity,
+ *                the whole system will be waiting it to be released.
+ *  */
+public class FileStoreMonitor extends ActiveMQScheduledComponent {
+
+   private static final Logger logger = Logger.getLogger(FileStoreMonitor.class);
+
+   private final Set<Callback> callbackList = new HashSet<>();
+   private final Set<FileStore> stores = new HashSet<>();
+   private double maxUsage;
+
+   public FileStoreMonitor(ScheduledExecutorService scheduledExecutorService,
+                           long checkPeriod,
+                           TimeUnit timeUnit,
+                           double maxUsage) {
+      super(scheduledExecutorService, checkPeriod, timeUnit);
+      this.maxUsage = maxUsage;
+   }
+
+   public synchronized FileStoreMonitor addCallback(Callback callback) {
+      callbackList.add(callback);
+      return this;
+   }
+
+   public synchronized FileStoreMonitor addStore(File file) throws IOException {
+      if (file.exists()) {
+         addStore(Files.getFileStore(file.toPath()));
+      }
+      return this;
+   }
+
+   public synchronized FileStoreMonitor addStore(FileStore store) {
+      stores.add(store);
+      return this;
+   }
+
+
+   public void run() {
+      tick();
+   }
+
+   public synchronized void tick() {
+      boolean over = false;
+
+      FileStore lastStore = null;
+      double usage = 0;
+
+      for (FileStore store : stores) {
+         try {
+            lastStore = store;
+            usage = calculateUsage(store);
+            over = usage  > maxUsage;
+            if (over) {
+               break;
+            }
+         }
+         catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+         }
+      }
+
+      for (Callback callback : callbackList) {
+         callback.tick(lastStore, usage);
+
+         if (over) {
+            callback.over(lastStore, usage);
+         }
+         else {
+            callback.under(lastStore, usage);
+         }
+      }
+   }
+
+   public double getMaxUsage() {
+      return maxUsage;
+   }
+
+   public FileStoreMonitor setMaxUsage(double maxUsage) {
+      this.maxUsage = maxUsage;
+      return this;
+   }
+
+   protected double calculateUsage(FileStore store) throws IOException {
+      return 1.0 - (double)store.getUsableSpace() / (double)store.getTotalSpace();
+   }
+
+   public interface Callback {
+      void tick(FileStore store, double usage);
+      void over(FileStore store, double usage);
+      void under(FileStore store, double usage);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index dba7e5b..680af8a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -123,6 +123,8 @@ import org.apache.activemq.artemis.core.server.cluster.BackupManager;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.core.server.cluster.Transformer;
 import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
+import org.apache.activemq.artemis.core.server.files.FileMoveManager;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
 import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
 import org.apache.activemq.artemis.core.server.group.impl.LocalGroupingHandler;
@@ -249,6 +251,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    private ReloadManager reloadManager;
 
+   private FileStoreMonitor fileStoreMonitor;
+
    /**
     * This will be set by the JMS Queue Manager.
     */
@@ -756,6 +760,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          }
          state = SERVER_STATE.STOPPING;
 
+         if (fileStoreMonitor != null) {
+            fileStoreMonitor.stop();
+            fileStoreMonitor = null;
+         }
+
          activation.sendLiveIsStopping();
 
          stopComponent(connectorsService);
@@ -1277,7 +1286,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                                                      SessionCallback callback,
                                                      OperationContext context,
                                                      boolean autoCreateJMSQueues) throws Exception {
-      return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, autoCreateJMSQueues ? jmsQueueCreator : null);
+      return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(),
+                                   xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(),
+                                   defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, autoCreateJMSQueues ? jmsQueueCreator : null,
+                                   pagingManager);
    }
 
    @Override
@@ -1771,8 +1783,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    protected PagingManager createPagingManager() {
-
-      return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository);
+      return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize());
    }
 
    protected PagingStoreFactoryNIO getPagingStoreFactory() {
@@ -2042,6 +2053,25 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          // We can only do this after everything is started otherwise we may get nasty races with expired messages
          postOffice.startExpiryScanner();
       }
+
+      try {
+         injectMonitor(new FileStoreMonitor(getScheduledPool(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f));
+      }
+      catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   /** This method exists for a possibility of test cases replacing the FileStoreMonitor for an extension that would for instance pretend a disk full on certain tests. */
+   public void injectMonitor(FileStoreMonitor storeMonitor) throws Exception {
+      this.fileStoreMonitor = storeMonitor;
+      pagingManager.injectMonitor(storeMonitor);
+      storageManager.injectMonitor(storeMonitor);
+      fileStoreMonitor.start();
+   }
+
+   public FileStoreMonitor getMonitor() {
+      return fileStoreMonitor;
    }
 
    public void completeActivation() throws Exception {
@@ -2075,8 +2105,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          addressCount++;
       }
 
+
       long maxMemory = Runtime.getRuntime().maxMemory();
-      if (totalMaxSizeBytes >= maxMemory) {
+      if (totalMaxSizeBytes >= maxMemory && configuration.getGlobalMaxSize() < 0) {
          ActiveMQServerLogger.LOGGER.potentialOOME(addressCount, totalMaxSizeBytes, maxMemory);
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileMoveManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileMoveManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileMoveManager.java
deleted file mode 100644
index efe1bb2..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileMoveManager.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.core.server.impl;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.Arrays;
-
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.jboss.logging.Logger;
-
-/**
- * Used to move files away.
- * Each time a backup starts its formeter data will be moved to a backup folder called bkp.1, bkp.2, ... etc
- * We may control the maximum number of folders so we remove old ones.
- */
-public class FileMoveManager {
-   private static final Logger logger = Logger.getLogger(FileMoveManager.class);
-
-   private final File folder;
-   private int maxFolders;
-   public static final String PREFIX = "oldreplica.";
-
-   private static final FilenameFilter isPrefix = new FilenameFilter() {
-      @Override
-      public boolean accept(File dir, String name) {
-         boolean prefixed = name.contains(PREFIX);
-
-         if (prefixed) {
-            try {
-               Integer.parseInt(name.substring(PREFIX.length()));
-            }
-            catch (NumberFormatException e) {
-               // This function is not really used a lot
-               // so I don't really mind about performance here
-               // this is good enough for what we need
-               prefixed = false;
-            }
-         }
-
-         return prefixed;
-      }
-   };
-
-   private static final FilenameFilter notPrefix = new FilenameFilter() {
-      @Override
-      public boolean accept(File dir, String name) {
-         return !isPrefix.accept(dir, name);
-      }
-   };
-
-
-   public FileMoveManager(File folder) {
-      this(folder, -1);
-   }
-
-   public FileMoveManager(File folder, int maxFolders) {
-      this.folder = folder;
-      this.maxFolders = maxFolders;
-   }
-
-   public int getMaxFolders() {
-      return maxFolders;
-   }
-
-   public FileMoveManager setMaxFolders(int maxFolders) {
-      this.maxFolders = maxFolders;
-      return this;
-   }
-
-   public void doMove() throws IOException {
-      String[] files = getFiles();
-
-      if (files == null || files.length == 0) {
-         // if no files, nothing to be done, no backup, no deletes... nothing!
-         return;
-      }
-
-      // Since we will create one folder, we are already taking that one into consideration
-      internalCheckOldFolders(1);
-
-      int whereToMove = getMaxID() + 1;
-
-      File folderTo = getFolder(whereToMove);
-      folderTo.mkdirs();
-
-      ActiveMQServerLogger.LOGGER.backupMovingDataAway(folder.getPath(), folderTo.getPath());
-
-      for (String fileMove : files) {
-         File fileFrom = new File(folder, fileMove);
-         File fileTo = new File(folderTo, fileMove);
-         logger.tracef("doMove:: moving %s as %s", fileFrom, fileTo);
-         Files.move(fileFrom.toPath(), fileTo.toPath());
-      }
-
-   }
-
-   public void checkOldFolders() {
-      internalCheckOldFolders(0);
-   }
-
-   private void internalCheckOldFolders(int creating) {
-      if (maxFolders > 0) {
-         int folders = getNumberOfFolders();
-
-         // We are counting the next one to be created
-         int foldersToDelete = folders + creating - maxFolders;
-
-         if (foldersToDelete > 0) {
-            logger.tracef("There are %d folders to delete", foldersToDelete);
-            int[] ids = getIDlist();
-            for (int i = 0; i < foldersToDelete; i++) {
-               File file = getFolder(ids[i]);
-               ActiveMQServerLogger.LOGGER.removingBackupData(file.getPath());
-               deleteTree(file);
-            }
-         }
-      }
-   }
-
-   /**
-    * It will return non backup folders
-    */
-   public String[] getFiles() {
-      return folder.list(notPrefix);
-   }
-
-
-   public int getNumberOfFolders() {
-      return getFolders().length;
-   }
-
-   public String[] getFolders() {
-      String[] list = folder.list(isPrefix);
-
-      if (list == null) {
-         list = new String[0];
-      }
-
-
-      return list;
-   }
-
-
-   public int getMinID() {
-      int[] list = getIDlist();
-
-      if (list.length == 0) {
-         return 0;
-      }
-
-      return list[0];
-   }
-
-   public int getMaxID() {
-      int[] list = getIDlist();
-
-      if (list.length == 0) {
-         return 0;
-      }
-
-      return list[list.length - 1];
-   }
-
-
-   public int[] getIDlist() {
-      String[] list = getFolders();
-      int[] ids = new int[list.length];
-      for (int i = 0; i < ids.length; i++) {
-         ids[i] = getID(list[i]);
-      }
-
-      Arrays.sort(ids);
-
-      return ids;
-   }
-
-   public int getID(String folderName) {
-      return Integer.parseInt(folderName.substring(PREFIX.length()));
-   }
-
-
-   public File getFolder(int id) {
-      return new File(folder, PREFIX + id);
-   }
-
-
-   private void deleteTree(File file) {
-      File[] files = file.listFiles();
-
-      if (files != null) {
-         for (File fileDelete : files) {
-            deleteTree(fileDelete);
-         }
-      }
-
-      file.delete();
-   }
-
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 893b615..d20fa43 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -47,6 +48,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -128,6 +130,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    protected boolean xa;
 
+   protected final PagingManager pagingManager;
+
    protected final StorageManager storageManager;
 
    private final ResourceManager resourceManager;
@@ -199,7 +203,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                             final SimpleString defaultAddress,
                             final SessionCallback callback,
                             final OperationContext context,
-                            final QueueCreator queueCreator) throws Exception {
+                            final QueueCreator queueCreator,
+                            final PagingManager pagingManager) throws Exception {
       this.username = username;
 
       this.password = password;
@@ -224,6 +229,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
       this.securityStore = securityStore;
 
+      this.pagingManager = pagingManager;
+
       timeoutSeconds = resourceManager.getTimeoutSeconds();
       this.xa = xa;
 
@@ -1249,6 +1256,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    @Override
    public RoutingStatus send(final ServerMessage message, final boolean direct, boolean noAutoCreateQueue) throws Exception {
+
+      // If the protocol doesn't support flow control, we have no choice other than fail the communication
+      if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
+         ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
+         this.getRemotingConnection().fail(exception);
+         throw exception;
+      }
+
       RoutingStatus result = RoutingStatus.OK;
       //large message may come from StompSession directly, in which
       //case the id header already generated.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
index a735b93..7686ac5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
@@ -24,33 +24,25 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.jboss.logging.Logger;
 
-public class ReloadManagerImpl implements ReloadManager {
+public class ReloadManagerImpl extends ActiveMQScheduledComponent implements ReloadManager  {
    private static final Logger logger = Logger.getLogger(ReloadManagerImpl.class);
 
-   private final ScheduledExecutorService scheduledExecutorService;
-   private final long checkPeriod;
-   private ScheduledFuture future;
    private volatile Runnable tick;
 
    private Map<URL, ReloadRegistry> registry = new HashMap<>();
 
    public ReloadManagerImpl(ScheduledExecutorService scheduledExecutorService, long checkPeriod) {
-      this.scheduledExecutorService = scheduledExecutorService;
-      this.checkPeriod = checkPeriod;
+      super(scheduledExecutorService, checkPeriod, TimeUnit.MILLISECONDS);
    }
 
-   @Override
-   public synchronized void start() {
-      if (future != null) {
-         return;
-      }
-      future = scheduledExecutorService.scheduleWithFixedDelay(new ConfigurationFileReloader(), checkPeriod, checkPeriod, TimeUnit.MILLISECONDS);
+   public void run() {
+      tick();
    }
 
    @Override
@@ -59,24 +51,8 @@ public class ReloadManagerImpl implements ReloadManager {
    }
 
    @Override
-   public synchronized void stop() {
-      if (future == null) {
-         return; // no big deal
-      }
-
-      future.cancel(false);
-      future = null;
-
-   }
-
-   @Override
-   public synchronized boolean isStarted() {
-      return future != null;
-   }
-
-   @Override
    public synchronized void addCallback(URL uri, ReloadCallback callback) {
-      if (future == null) {
+      if (!isStarted()) {
          start();
       }
       ReloadRegistry uriRegistry = getRegistry(uri);
@@ -104,20 +80,6 @@ public class ReloadManagerImpl implements ReloadManager {
       return uriRegistry;
    }
 
-
-
-   private final class ConfigurationFileReloader implements Runnable {
-      @Override
-      public void run() {
-         try {
-            tick();
-         }
-         catch (Exception e) {
-            ActiveMQServerLogger.LOGGER.configurationReloadFailed(e);
-         }
-      }
-   }
-
    class ReloadRegistry {
       private final File file;
       private final URL uri;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 56c5e26..4d4abd7 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -676,6 +676,30 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="global-max-size" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  Global Max Size before all addresses will enter into their Full Policy configured upon messages being produced.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
+         <xsd:element name="max-disk-usage" type="xsd:int" default="90" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  Max percentage of disk usage before the system blocks or fail clients.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
+         <xsd:element name="disk-scan-period" type="xsd:long" default="5000" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  how often (in ms) to scan the disks for full disks.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element name="memory-warning-threshold" type="xsd:int" default="25" maxOccurs="1" minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index ba2cb7f..6045c8d 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -356,6 +356,9 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       assertFalse(a2Role.isCreateNonDurableQueue());
       assertTrue(a2Role.isDeleteNonDurableQueue());
       assertFalse(a2Role.isManage());
+      assertEquals(1234567, conf.getGlobalMaxSize());
+      assertEquals(37, conf.getMaxDiskUsage());
+      assertEquals(123, conf.getDiskScanPeriod());
    }
 
    @Test


Mime
View raw message