activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6547 - add storeUsage attribute to mKahadb filtered adapter such that disk utilisation is available as a QOS
Date Tue, 20 Dec 2016 14:47:49 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 04626d807 -> 0ae90c089


https://issues.apache.org/jira/browse/AMQ-6547 - add storeUsage attribute to mKahadb filtered
adapter such that disk utilisation is available as a QOS


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0ae90c08
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0ae90c08
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0ae90c08

Branch: refs/heads/master
Commit: 0ae90c0897835ec9a4f900ec36101350f004fd9c
Parents: 04626d8
Author: gtully <gary.tully@gmail.com>
Authored: Tue Dec 20 14:47:22 2016 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Tue Dec 20 14:47:22 2016 +0000

----------------------------------------------------------------------
 .../activemq/broker/region/BaseDestination.java |   4 +
 .../activemq/usage/PercentLimitUsage.java       |   6 +
 .../org/apache/activemq/usage/StoreUsage.java   |  10 +
 .../FilteredKahaDBPersistenceAdapter.java       |  15 +-
 .../store/kahadb/KahaDBPersistenceAdapter.java  |   2 +-
 .../kahadb/MultiKahaDBPersistenceAdapter.java   |  19 +-
 .../kahadb/MultiKahaDBTransactionStore.java     |  25 +++
 .../activemq/store/MKahaDBConfigTest.java       | 102 ++++++++++
 .../org/apache/activemq/store/mKahaDB.xml       |  60 ++++++
 .../store/kahadb/MKahaDBStoreLimitTest.java     | 189 +++++++++++++++++++
 10 files changed, 421 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0ae90c08/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 75f2ee0..26a8ccc 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -880,4 +880,8 @@ public abstract class BaseDestination implements Destination {
     public boolean isPersistJMSRedelivered() {
         return persistJMSRedelivered;
     }
+
+    public SystemUsage getSystemUsage() {
+        return systemUsage;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/0ae90c08/activemq-broker/src/main/java/org/apache/activemq/usage/PercentLimitUsage.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/usage/PercentLimitUsage.java
b/activemq-broker/src/main/java/org/apache/activemq/usage/PercentLimitUsage.java
index a72dece..20a47aa 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/usage/PercentLimitUsage.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/usage/PercentLimitUsage.java
@@ -54,6 +54,12 @@ public abstract class PercentLimitUsage <T extends Usage> extends
Usage<T> {
         }
     }
 
+    /**
+     * Sets the total available space in bytes. When non zero, the filesystem totalAvailableSpace
is ignored.
+     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
+     *
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
+     */
     public void setTotal(long max) {
         this.total = max;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/0ae90c08/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java b/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java
index a2cd0fd..928da83 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java
@@ -98,4 +98,14 @@ public class StoreUsage extends PercentLimitUsage<StoreUsage> {
             usageLock.writeLock().unlock();
         }
     }
+
+    public StoreUsage copy() {
+        StoreUsage storeUsage = new StoreUsage();
+        storeUsage.name = name;
+        storeUsage.parent = parent;
+        storeUsage.total = total;
+        storeUsage.percentLimit = percentLimit;
+        storeUsage.getLimiter().setLimit(getLimiter().getLimit());
+        return storeUsage;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/0ae90c08/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
index f994c67..7837345 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
@@ -19,6 +19,7 @@ package org.apache.activemq.store.kahadb;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.filter.DestinationMapEntry;
 import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.usage.StoreUsage;
 
 /**
  * @org.apache.xbean.XBean element="filteredKahaDB"
@@ -27,14 +28,18 @@ import org.apache.activemq.store.PersistenceAdapter;
 public class FilteredKahaDBPersistenceAdapter extends DestinationMapEntry {
     private PersistenceAdapter persistenceAdapter;
     private boolean perDestination;
+    private StoreUsage usage;
 
     public FilteredKahaDBPersistenceAdapter() {
         super();
     }
 
-    public FilteredKahaDBPersistenceAdapter(ActiveMQDestination destination, PersistenceAdapter
adapter) {
+    public FilteredKahaDBPersistenceAdapter(FilteredKahaDBPersistenceAdapter template, ActiveMQDestination
destination, PersistenceAdapter adapter) {
         setDestination(destination);
         persistenceAdapter  = adapter;
+        if (template.getUsage() != null) {
+            usage = template.getUsage().copy();
+        }
     }
 
     public PersistenceAdapter getPersistenceAdapter() {
@@ -65,4 +70,12 @@ public class FilteredKahaDBPersistenceAdapter extends DestinationMapEntry
{
         }
         return super.compareTo(that);
     }
+
+    public void setUsage(StoreUsage usage) {
+        this.usage = usage;
+    }
+
+    public StoreUsage getUsage() {
+        return usage;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/0ae90c08/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index fd77e49..1740f6b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -214,7 +214,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
      */
     @Override
     public long size() {
-        return this.letter.size();
+        return this.letter.isStarted() ? this.letter.size() : 0l;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/0ae90c08/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index 223afb9..4fa6b3d 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -56,6 +56,7 @@ import org.apache.activemq.store.TransactionIdTransformer;
 import org.apache.activemq.store.TransactionIdTransformerAware;
 import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.activemq.usage.StoreUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IOHelper;
@@ -391,26 +392,26 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport
implem
     }
 
     private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter,
File candidate) throws IOException {
-        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(),
candidate.getName());
+        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter, candidate.getName());
         startAdapter(adapter, candidate.getName());
         Set<ActiveMQDestination> destinations = adapter.getDestinations();
         if (destinations.size() != 0) {
-            registerAdapter(adapter, destinations.toArray(new ActiveMQDestination[]{})[0]);
+            registerAdapter(filteredAdapter, adapter, destinations.toArray(new ActiveMQDestination[]{})[0]);
         } else {
             stopAdapter(adapter, candidate.getName());
         }
     }
 
     private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter
filteredAdapter, ActiveMQDestination destination) throws IOException {
-        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(),
nameFromDestinationFilter(destination));
-        return registerAdapter(adapter, destination);
+        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter, nameFromDestinationFilter(destination));
+        return registerAdapter(filteredAdapter, adapter, destination);
     }
 
-    private PersistenceAdapter adapterFromTemplate(PersistenceAdapter template, String destinationName)
throws IOException {
-        PersistenceAdapter adapter = kahaDBFromTemplate(template);
+    private PersistenceAdapter adapterFromTemplate(FilteredKahaDBPersistenceAdapter template,
String destinationName) throws IOException {
+        PersistenceAdapter adapter = kahaDBFromTemplate(template.getPersistenceAdapter());
         configureAdapter(adapter);
         configureDirectory(adapter, destinationName);
-        configureIndexDirectory(adapter, template, destinationName);
+        configureIndexDirectory(adapter, template.getPersistenceAdapter(), destinationName);
         return adapter;
     }
 
@@ -449,9 +450,9 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport
implem
         adapter.setDirectory(directory);
     }
 
-    private FilteredKahaDBPersistenceAdapter registerAdapter(PersistenceAdapter adapter,
ActiveMQDestination destination) {
+    private FilteredKahaDBPersistenceAdapter registerAdapter(FilteredKahaDBPersistenceAdapter
template, PersistenceAdapter adapter, ActiveMQDestination destination) {
         adapters.add(adapter);
-        FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(destination,
adapter);
+        FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(template,
destination, adapter);
         destinationMap.put(destination, result);
         return result;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/0ae90c08/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index 21d00c0..ff70076 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -28,12 +28,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.BaseDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.IndexListener;
 import org.apache.activemq.store.ListenableFuture;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -48,6 +50,7 @@ import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
 import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
 import org.apache.activemq.store.kahadb.disk.journal.Journal;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.usage.StoreUsage;
 import org.apache.activemq.util.DataByteArrayInputStream;
 import org.apache.activemq.util.DataByteArrayOutputStream;
 import org.apache.activemq.util.IOHelper;
@@ -99,6 +102,28 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
             public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws
IOException {
                 MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context,
getDelegate(), ack);
             }
+
+            @Override
+            public void registerIndexListener(IndexListener indexListener) {
+                getDelegate().registerIndexListener(indexListener);
+                try {
+                    if (indexListener instanceof BaseDestination) {
+                        // update queue storeUsage
+                        Object matchingPersistenceAdapter = multiKahaDBPersistenceAdapter.destinationMap.chooseValue(getDelegate().getDestination());
+                        if (matchingPersistenceAdapter instanceof FilteredKahaDBPersistenceAdapter)
{
+                            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter)
matchingPersistenceAdapter;
+                            if (filteredAdapter.getUsage() != null && filteredAdapter.getPersistenceAdapter()
instanceof KahaDBPersistenceAdapter) {
+                                StoreUsage storeUsage = filteredAdapter.getUsage();
+                                storeUsage.setStore(filteredAdapter.getPersistenceAdapter());
+                                storeUsage.setParent(multiKahaDBPersistenceAdapter.getBrokerService().getSystemUsage().getStoreUsage());
+                                ((BaseDestination) indexListener).getSystemUsage().setStoreUsage(storeUsage);
+                            }
+                        }
+                    }
+                } catch (Exception ignored) {
+                    LOG.warn("Failed to set mKahaDB destination store usage", ignored);
+                }
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/0ae90c08/activemq-spring/src/test/java/org/apache/activemq/store/MKahaDBConfigTest.java
----------------------------------------------------------------------
diff --git a/activemq-spring/src/test/java/org/apache/activemq/store/MKahaDBConfigTest.java
b/activemq-spring/src/test/java/org/apache/activemq/store/MKahaDBConfigTest.java
new file mode 100644
index 0000000..a3cfc6b
--- /dev/null
+++ b/activemq-spring/src/test/java/org/apache/activemq/store/MKahaDBConfigTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import junit.framework.TestCase;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.BaseDestination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.Resource;
+
+import java.io.File;
+
+public class MKahaDBConfigTest extends TestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MKahaDBConfigTest.class);
+
+    /*
+     * This tests configuring the different broker properties using
+     * xbeans-spring
+     */
+    public void testBrokerConfig() throws Exception {
+        BrokerService broker;
+
+        broker = createBroker("org/apache/activemq/store/mKahaDB.xml");
+        LOG.info("Success");
+
+        try {
+            assertEquals("Broker Config Error (brokerName)", "brokerConfigTest", broker.getBrokerName());
+            assertEquals("Broker Config Error (populateJMSXUserID)", false, broker.isPopulateJMSXUserID());
+            assertEquals("Broker Config Error (persistent)", true, broker.isPersistent());
+            LOG.info("Success");
+
+            SystemUsage systemUsage = broker.getSystemUsage();
+            assertTrue("Should have a SystemUsage", systemUsage != null);
+            assertEquals("SystemUsage Config Error (StoreUsage.limit)", 1 * 1024 * 1024 *
1024, systemUsage.getStoreUsage().getLimit());
+            assertEquals("SystemUsage Config Error (StoreUsage.name)", "foo", systemUsage.getStoreUsage().getName());
+
+            assertNotNull(systemUsage.getStoreUsage().getStore());
+            assertTrue(systemUsage.getStoreUsage().getStore() instanceof MultiKahaDBPersistenceAdapter);
+
+            LOG.info("Success");
+
+            broker.getAdminView().addQueue("A.B");
+
+            BaseDestination queue = (BaseDestination) broker.getRegionBroker().getDestinationMap().get(new
ActiveMQQueue("A.B"));
+            assertTrue(queue.getSystemUsage().getStoreUsage().getStore() instanceof KahaDBPersistenceAdapter);
+            assertEquals(50*1024*1024, queue.getSystemUsage().getStoreUsage().getLimit());
+
+        } finally {
+            if (broker != null) {
+                broker.stop();
+            }
+        }
+    }
+
+    protected static void recursiveDelete(File file) {
+        if (file.isDirectory()) {
+            File[] files = file.listFiles();
+            for (int i = 0; i < files.length; i++) {
+                recursiveDelete(files[i]);
+            }
+        }
+        file.delete();
+    }
+
+    protected BrokerService createBroker(String resource) throws Exception {
+        return createBroker(new ClassPathResource(resource));
+    }
+
+    protected BrokerService createBroker(Resource resource) throws Exception {
+        BrokerFactoryBean factory = new BrokerFactoryBean(resource);
+        factory.afterPropertiesSet();
+
+        BrokerService broker = factory.getBroker();
+
+        assertTrue("Should have a broker!", broker != null);
+
+        return broker;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/0ae90c08/activemq-spring/src/test/resources/org/apache/activemq/store/mKahaDB.xml
----------------------------------------------------------------------
diff --git a/activemq-spring/src/test/resources/org/apache/activemq/store/mKahaDB.xml b/activemq-spring/src/test/resources/org/apache/activemq/store/mKahaDB.xml
new file mode 100644
index 0000000..cd42d68
--- /dev/null
+++ b/activemq-spring/src/test/resources/org/apache/activemq/store/mKahaDB.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+    <!-- normal ActiveMQ XML config which is less verbose & can be validated -->
+    <amq:broker brokerName="brokerConfigTest">
+
+        <amq:persistenceAdapter>
+            <amq:mKahaDB directory = "target/activemq-data">
+                <amq:filteredPersistenceAdapters>
+                    <amq:filteredKahaDB perDestination="true">
+                        <amq:usage>
+                            <amq:storeUsage percentLimit="10" total="500 mb"></amq:storeUsage>
+                        </amq:usage>
+                        <amq:persistenceAdapter>
+                            <amq:kahaDB journalMaxFileLength="10mb"/>
+                        </amq:persistenceAdapter>
+                    </amq:filteredKahaDB>
+                </amq:filteredPersistenceAdapters>
+            </amq:mKahaDB>
+
+        </amq:persistenceAdapter>
+
+        <amq:systemUsage>
+            <amq:systemUsage>
+                <amq:memoryUsage>
+                    <amq:memoryUsage limit="10 mb" percentUsageMinDelta="20"/>
+                </amq:memoryUsage>
+                <amq:storeUsage>
+                    <amq:storeUsage limit="1 gb" name="foo"/>
+                </amq:storeUsage>
+                <amq:tempUsage>
+                    <amq:tempUsage limit="100 mb"/>
+                </amq:tempUsage>
+            </amq:systemUsage>
+        </amq:systemUsage>
+
+    </amq:broker>
+
+</beans>

http://git-wip-us.apache.org/repos/asf/activemq/blob/0ae90c08/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.java
new file mode 100644
index 0000000..0ce5795
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.BaseDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.usage.StoreUsage;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class MKahaDBStoreLimitTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MKahaDBStoreLimitTest.class);
+
+    final ActiveMQQueue queueA = new ActiveMQQueue("Q.A");
+    final ActiveMQQueue queueB = new ActiveMQQueue("Q.B");
+
+    private BrokerService broker;
+
+    @After
+    public void stopBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    private BrokerService createBroker(MultiKahaDBPersistenceAdapter persistenceAdapter)
throws Exception {
+        broker = new BrokerService();
+        broker.setPersistenceAdapter(persistenceAdapter);
+        broker.setUseJmx(false);
+        broker.setAdvisorySupport(false);
+        broker.setSchedulerSupport(false);
+        broker.setPersistenceAdapter(persistenceAdapter);
+        broker.setDeleteAllMessagesOnStartup(true);
+        return broker;
+    }
+
+    @Test
+    public void testPerDestUsage() throws Exception {
+
+        // setup multi-kaha adapter
+        MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
+
+        KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+        kahaStore.setJournalMaxFileLength(1024 * 5);
+        kahaStore.setCleanupInterval(1000);
+
+        // set up a store per destination
+        FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
+        StoreUsage storeUsage = new StoreUsage();
+        storeUsage.setPercentLimit(10);
+        storeUsage.setTotal(1024*1024*10);
+        filtered.setUsage(storeUsage);
+        filtered.setPersistenceAdapter(kahaStore);
+        filtered.setPerDestination(true);
+        List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
+        stores.add(filtered);
+
+        persistenceAdapter.setFilteredPersistenceAdapters(stores);
+
+        createBroker(persistenceAdapter).start();
+
+
+
+        produceMessages(queueA, 20);
+        produceMessages(queueB, 0);
+
+        LOG.info("Store global u: " + broker.getSystemUsage().getStoreUsage().getUsage()
+ ", %:" + broker.getSystemUsage().getStoreUsage().getPercentUsage());
+
+        assertTrue("some usage", broker.getSystemUsage().getStoreUsage().getUsage() >
0);
+
+        BaseDestination baseDestinationA = (BaseDestination) broker.getRegionBroker().getDestinationMap().get(queueA);
+        BaseDestination baseDestinationB = (BaseDestination) broker.getRegionBroker().getDestinationMap().get(queueB);
+
+        LOG.info("Store A u: " + baseDestinationA.getSystemUsage().getStoreUsage().getUsage()
+ ", %: " + baseDestinationA.getSystemUsage().getStoreUsage().getPercentUsage());
+
+        assertTrue(baseDestinationA.getSystemUsage().getStoreUsage().getUsage() > 0);
+
+        produceMessages(queueB, 40);
+        assertTrue(baseDestinationB.getSystemUsage().getStoreUsage().getUsage() > 0);
+        assertTrue(baseDestinationB.getSystemUsage().getStoreUsage().getUsage() > baseDestinationA.getSystemUsage().getStoreUsage().getUsage());
+
+        LOG.info("Store B u: " + baseDestinationB.getSystemUsage().getStoreUsage().getUsage()
+ ", %: " + baseDestinationB.getSystemUsage().getStoreUsage().getPercentUsage());
+        LOG.info("Store global u: " + broker.getSystemUsage().getStoreUsage().getUsage()
+ ", %:" + broker.getSystemUsage().getStoreUsage().getPercentUsage());
+
+        consume(queueA);
+
+        consume(queueB);
+
+        LOG.info("Store global u: " + broker.getSystemUsage().getStoreUsage().getUsage()
+ ", %:" + broker.getSystemUsage().getStoreUsage().getPercentUsage());
+        LOG.info("Store A u: " + baseDestinationA.getSystemUsage().getStoreUsage().getUsage()
+ ", %: " + baseDestinationA.getSystemUsage().getStoreUsage().getPercentUsage());
+        LOG.info("Store B u: " + baseDestinationB.getSystemUsage().getStoreUsage().getUsage()
+ ", %: " + baseDestinationB.getSystemUsage().getStoreUsage().getPercentUsage());
+
+    }
+
+    @Test
+    public void testExplicitAdapter() throws Exception {
+        MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
+        KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+        kahaStore.setJournalMaxFileLength(1024*25);
+
+        FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
+        StoreUsage storeUsage = new StoreUsage();
+        storeUsage.setPercentLimit(50);
+        storeUsage.setTotal(512*1024);
+
+        filtered.setUsage(storeUsage);
+        filtered.setDestination(queueA);
+        filtered.setPersistenceAdapter(kahaStore);
+        List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
+        stores.add(filtered);
+
+        persistenceAdapter.setFilteredPersistenceAdapters(stores);
+
+        BrokerService brokerService = createBroker(persistenceAdapter);
+        brokerService.getSystemUsage().getStoreUsage().setTotal(1024*1024);
+        brokerService.start();
+
+
+        produceMessages(queueA, 20);
+
+        LOG.info("Store global u: " + broker.getSystemUsage().getStoreUsage().getUsage()
+ ", %:" + broker.getSystemUsage().getStoreUsage().getPercentUsage());
+
+        assertTrue("some usage", broker.getSystemUsage().getStoreUsage().getUsage() >
0);
+
+        BaseDestination baseDestinationA = (BaseDestination) broker.getRegionBroker().getDestinationMap().get(queueA);
+        LOG.info("Store A u: " + baseDestinationA.getSystemUsage().getStoreUsage().getUsage()
+ ", %: " + baseDestinationA.getSystemUsage().getStoreUsage().getPercentUsage());
+
+        assertTrue("limited store has more % usage than parent", baseDestinationA.getSystemUsage().getStoreUsage().getPercentUsage()
> broker.getSystemUsage().getStoreUsage().getPercentUsage());
+
+    }
+
+
+    private void consume(Destination queue) throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false");
+        Connection connection = cf.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queue);
+        for (int i = 0; i < 5; ++i) {
+            assertNotNull("message[" + i + "]", consumer.receive(4000));
+        }
+        connection.close();
+    }
+
+    private void produceMessages(Destination queue, int count) throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false");
+        Connection connection = cf.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(queue);
+        BytesMessage bytesMessage = session.createBytesMessage();
+        bytesMessage.writeBytes(new byte[2*1024]);
+        for (int i = 0; i < count; ++i) {
+            producer.send(bytesMessage);
+        }
+        connection.close();
+    }
+}


Mime
View raw message