activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject [activemq-artemis] branch master updated: ARTEMIS-2888 ARTEMIS-2859 ARTEMIS-2768 - new page-store-name addressSetting to allow wildcard subscriptions share a single page store
Date Thu, 24 Sep 2020 09:15:40 GMT
This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new fa04881  ARTEMIS-2888 ARTEMIS-2859 ARTEMIS-2768 - new page-store-name addressSetting to allow wildcard subscriptions share a single page store
     new 622acf1  Merge pull request #3265 from gtully/ARTEMIS-2888
fa04881 is described below

commit fa04881c6f6e44e4de8b2b70a0be181185d1e1ca
Author: gtully <gary.tully@gmail.com>
AuthorDate: Wed Sep 16 17:56:12 2020 +0100

    ARTEMIS-2888 ARTEMIS-2859 ARTEMIS-2768 - new page-store-name addressSetting to allow wildcard subscriptions share a single page store
---
 .../artemis/api/core/QueueConfiguration.java       |   9 +
 .../deployers/impl/FileConfigurationParser.java    |   3 +
 .../core/paging/impl/PagingManagerImpl.java        |  15 +-
 .../artemis/core/paging/impl/PagingStoreImpl.java  |   2 -
 .../core/server/impl/QueueConfigurationUtils.java  |   1 +
 .../artemis/core/server/impl/QueueFactoryImpl.java |   2 +-
 .../core/settings/impl/AddressSettings.java        |  31 ++-
 .../resources/schema/artemis-configuration.xsd     |  10 +-
 .../core/config/impl/FileConfigurationTest.java    |   2 +
 .../resources/ConfigurationTest-full-config.xml    |   1 +
 ...rationTest-xinclude-config-address-settings.xml |   1 +
 docs/user-manual/en/address-model.md               |  32 +++
 docs/user-manual/en/wildcard-routing.md            |  13 +
 .../features/standard/topic-hierarchies/pom.xml    |   5 +-
 .../features/standard/topic-hierarchies/readme.md  |  10 +
 .../src/main/resources/activemq/server0/broker.xml |  68 +++++
 .../tests/integration/jms/client/WildcardTest.java | 274 +++++++++++++++++++++
 .../mqtt/MqttWildCardSubAutoCreateTest.java        | 234 ++++++++++++++++++
 .../integration/mqtt/imported/MQTTTestSupport.java |   4 +-
 .../integration/paging/PagingSizeWildcardTest.java | 141 +++++++++++
 .../tests/integration/paging/PagingTest.java       |  86 +++++++
 21 files changed, 932 insertions(+), 12 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java
index 2257721..1166413 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java
@@ -109,6 +109,7 @@ public class QueueConfiguration implements Serializable {
    private Boolean internal;
    private Boolean _transient;
    private Boolean autoCreated;
+   private transient SimpleString pageStoreName;
 
    /**
     * Instantiate this object and invoke {@link #setName(SimpleString)}
@@ -877,4 +878,12 @@ public class QueueConfiguration implements Serializable {
          + ", transient=" + _transient
          + ", autoCreated=" + autoCreated + ']';
    }
+
+   public void setPageStoreName(SimpleString pageStoreName) {
+      this.pageStoreName = pageStoreName;
+   }
+
+   public SimpleString getPageStoreName() {
+      return pageStoreName != null ? pageStoreName : getAddress();
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 1a6f3d0..6213d75 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -296,6 +296,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
    private static final String ENABLE_METRICS = "enable-metrics";
 
+   private static final String PAGE_STORE_NAME = "page-store-name";
 
    // Attributes ----------------------------------------------------
 
@@ -1266,6 +1267,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
             addressSettings.setExpiryQueueSuffix(new SimpleString(getTrimmedTextContent(child)));
          } else if (ENABLE_METRICS.equalsIgnoreCase(name)) {
             addressSettings.setEnableMetrics(XMLUtil.parseBoolean(child));
+         } else if (PAGE_STORE_NAME.equalsIgnoreCase(name)) {
+            addressSettings.setPageStoreName(new SimpleString(getTrimmedTextContent(child)));
          }
       }
       return setting;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index 4f930b7..23828bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -336,6 +336,14 @@ public final class PagingManagerImpl implements PagingManager {
 
    @Override
    public void deletePageStore(final SimpleString storeName) throws Exception {
+      final AddressSettings addressSettings = addressSettingsRepository.getMatch(storeName.toString());
+      if (addressSettings != null && addressSettings.getPageStoreName() != null) {
+         if (logger.isTraceEnabled()) {
+            logger.tracev("not deleting potentially shared pageAddress {} match for {}", addressSettings.getPageStoreName(), storeName);
+         }
+         return;
+      }
+
       syncLock.readLock().lock();
       try {
          PagingStore store = stores.remove(storeName);
@@ -352,11 +360,16 @@ public final class PagingManagerImpl implements PagingManager {
     * This method creates a new store if not exist.
     */
    @Override
-   public PagingStore getPageStore(final SimpleString storeName) throws Exception {
+   public PagingStore getPageStore(SimpleString storeName) throws Exception {
       if (managementAddress != null && storeName.startsWith(managementAddress)) {
          return null;
       }
 
+      final AddressSettings addressSettings = addressSettingsRepository.getMatch(storeName.toString());
+      if (addressSettings != null && addressSettings.getPageStoreName() != null) {
+         storeName = addressSettings.getPageStoreName();
+      }
+
       PagingStore store = stores.get(storeName);
       if (store != null) {
          return store;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index bc036ad..6fb4797 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -844,8 +844,6 @@ public class PagingStoreImpl implements PagingStore {
                return false;
             }
 
-            message.setAddress(address);
-
             final long transactionID = tx == null ? -1 : tx.getID();
             PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java
index 5bbf7c2..6f9788e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java
@@ -46,5 +46,6 @@ public class QueueConfigurationUtils {
       config.setAutoDeleteMessageCount(config.getAutoDeleteMessageCount() == null ? as.getAutoDeleteQueuesMessageCount() : config.getAutoDeleteMessageCount());
 
       config.setEnabled(config.isEnabled() == null ? ActiveMQDefaultConfiguration.getDefaultEnabled() : config.isEnabled());
+      config.setPageStoreName(as.getPageStoreName());
    }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
index 13e3c31..9b616ff 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
@@ -140,7 +140,7 @@ public class QueueFactoryImpl implements QueueFactory {
       PageSubscription pageSubscription;
 
       try {
-         PagingStore pageStore = pagingManager.getPageStore(queueConfiguration.getAddress());
+         PagingStore pageStore = pagingManager.getPageStore(queueConfiguration.getPageStoreName());
          if (pageStore != null) {
             pageSubscription = pageStore.getCursorProvider().createSubscription(queueConfiguration.getId(), FilterImpl.createFilter(queueConfiguration.getFilterString()), queueConfiguration.isDurable());
          } else {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index e713b08..65da14b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -253,6 +253,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    private Boolean enableMetrics = null;
 
+   private SimpleString pageStoreName = null;
+
    //from amq5
    //make it transient
    private transient Integer queuePrefetch = null;
@@ -318,6 +320,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       this.defaultGroupFirstKey = other.defaultGroupFirstKey;
       this.defaultRingSize = other.defaultRingSize;
       this.enableMetrics = other.enableMetrics;
+      this.pageStoreName = other.pageStoreName;
    }
 
    public AddressSettings() {
@@ -914,6 +917,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       return this;
    }
 
+   public SimpleString getPageStoreName() {
+      return pageStoreName;
+   }
+
+   public AddressSettings setPageStoreName(final SimpleString pageStoreName) {
+      this.pageStoreName = pageStoreName;
+      return this;
+   }
+
    /**
     * merge 2 objects in to 1
     *
@@ -1107,6 +1119,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (enableMetrics == null) {
          enableMetrics = merged.enableMetrics;
       }
+      if (pageStoreName == null) {
+         pageStoreName = merged.pageStoreName;
+      }
    }
 
    @Override
@@ -1320,6 +1335,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          defaultGroupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer);
       }
 
+      if (buffer.readableBytes() > 0) {
+         pageStoreName = buffer.readNullableSimpleString();
+      }
+
    }
 
    @Override
@@ -1383,7 +1402,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          SimpleString.sizeofNullableString(expiryQueuePrefix) +
          SimpleString.sizeofNullableString(expiryQueueSuffix) +
          BufferHelper.sizeOfNullableBoolean(enableMetrics) +
-         BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch);
+         BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch) +
+         SimpleString.sizeofNullableString(pageStoreName);
    }
 
    @Override
@@ -1510,6 +1530,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
       BufferHelper.writeNullableBoolean(buffer, defaultGroupRebalancePauseDispatch);
 
+      buffer.writeNullableSimpleString(pageStoreName);
    }
 
    /* (non-Javadoc)
@@ -1581,6 +1602,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       result = prime * result + ((expiryQueuePrefix == null) ? 0 : expiryQueuePrefix.hashCode());
       result = prime * result + ((expiryQueueSuffix == null) ? 0 : expiryQueueSuffix.hashCode());
       result = prime * result + ((enableMetrics == null) ? 0 : enableMetrics.hashCode());
+      result = prime * result + ((pageStoreName == null) ? 0 : pageStoreName.hashCode());
       return result;
    }
 
@@ -1928,6 +1950,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       } else if (!enableMetrics.equals(other.enableMetrics))
          return false;
 
+      if (pageStoreName == null) {
+         if (other.pageStoreName != null)
+            return false;
+      } else if (!pageStoreName.equals(other.pageStoreName))
+         return false;
+
       return true;
    }
 
@@ -2057,6 +2085,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          expiryQueueSuffix +
          ", enableMetrics=" +
          enableMetrics +
+         ", pageAddress=" + pageStoreName +
          "]";
    }
 }
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 7b99be9..0e0ea15 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -3687,13 +3687,21 @@
                   </xsd:documentation>
                </xsd:annotation>
             </xsd:element>
+
+            <xsd:element name="page-store-name" type="xsd:string" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     the name of the page store to use, to allow the page store to coalesce for address hierarchies when wildcard routing is in play
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
             
          </xsd:all>
 
          <xsd:attribute name="match" type="xsd:string" use="required">
             <xsd:annotation>
                <xsd:documentation>
-                  pattern for matching settings against addresses; can use wildards
+                  pattern for matching settings against addresses; can use wildcards
                </xsd:documentation>
             </xsd:annotation>
          </xsd:attribute>
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 583b450..cab5752 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -378,6 +378,8 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       assertEquals(3, conf.getAddressesSettings().get("a1").getDefaultRingSize());
       assertEquals(0, conf.getAddressesSettings().get("a1").getRetroactiveMessageCount());
       assertTrue(conf.getAddressesSettings().get("a1").isEnableMetrics());
+      assertNull("none fonfigured", conf.getAddressesSettings().get("a1").getPageStoreName());
+      assertEquals(new SimpleString("a2.shared"), conf.getAddressesSettings().get("a2").getPageStoreName());
 
       assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
       assertEquals(true, conf.getAddressesSettings().get("a2").isAutoCreateDeadLetterResources());
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 0bde4f2..1075602 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -481,6 +481,7 @@
             <default-consumer-window-size>10000</default-consumer-window-size>
             <retroactive-message-count>10</retroactive-message-count>
             <enable-metrics>false</enable-metrics>
+            <page-store-name>a2.shared</page-store-name>
          </address-setting>
       </address-settings>
       <resource-limit-settings>
diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
index 5be0f08..83267f3 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
@@ -78,5 +78,6 @@
       <default-consumer-window-size>10000</default-consumer-window-size>
       <retroactive-message-count>10</retroactive-message-count>
       <enable-metrics>false</enable-metrics>
+      <page-store-name>a2.shared</page-store-name>
    </address-setting>
 </address-settings>
\ No newline at end of file
diff --git a/docs/user-manual/en/address-model.md b/docs/user-manual/en/address-model.md
index 64e9acc..369e6fd 100644
--- a/docs/user-manual/en/address-model.md
+++ b/docs/user-manual/en/address-model.md
@@ -781,6 +781,38 @@ the client-side. If the value is `BLOCK` then client message producers will
 block when they try and send further messages.  See the [Flow
 Control](flow-control.md) and [Paging](paging.md) chapters for more info.
 
+`page-store-name` defines the name of the shared page store for matching addresses.
+It is typically unused because the page store name maps to an address name by default.
+However when addresses are hierarchical and subscriptions use 
+[wildcards](wildcard-routing.md), this setting is **required** to support [paging](paging.md).
+Subscriptions assume a single page store for cursor management and resource usage
+calculations. Using an explicitly configured `page-store-name` that will match the
+root address of the hierarchy, paging can coalesce to a single page store and 
+the required assumptions will hold.
+
+For example, with a MULTICAST address hierarchy of:
+ - ticker.stock.us.apple
+ - ticker.stock.us.orange
+ - ticker.stock.eu.pear
+ 
+ and with wildcard subscriptions on:
+  - ticker.stock.#
+  - ticker.stock.eu.#
+  
+ an address setting of: 
+ 
+ ```xml
+ <address-settings>
+    <address-setting match="ticker.stock.#">
+       <page-store-name>ticker.stock.#</page-store-name>
+       ...
+ ```
+ will ensure that all paged messages coalesce into a single page store named `ticker.stock.#`.
+ The name does not need to be the same as the `match` attribute, it can be any string value.
+ What **is** important is that the `match` attribute captures the root of the hierarchy that will
+ support wildcards subscriptions.
+ 
+ 
 `message-counter-history-day-limit` is the number of days to keep message
 counter history for this address assuming that `message-counter-enabled` is
 `true`. Default is `0`.
diff --git a/docs/user-manual/en/wildcard-routing.md b/docs/user-manual/en/wildcard-routing.md
index 0db7748..7fef8f3 100644
--- a/docs/user-manual/en/wildcard-routing.md
+++ b/docs/user-manual/en/wildcard-routing.md
@@ -20,5 +20,18 @@ This functionality is enabled by default. To turn it off add the following to th
 </wildcard-addresses>
 ```
 
+## Paging with wild card addresses
+Paging occurs at the address level and queue subscriptions access messages for an address through paging.
+When wildcard routing is in play, it is normal for a queue to access multiple addresses and hence, potentially
+multiple page stores.
+To avoid the problems inherent in referencing multiple page stores, it is necessary to configure a wild card addresses
+hierarchy with a single shared page store via an address setting called `page-store-name`.
+
+```xml
+<address-setting match="news.#">
+   <page-store-name>news-wildcard</page-store-name>
+</address-setting>
+```
+
 For more information on the wild card syntax and how to configure it, take a look at [wildcard syntax](wildcard-syntax.md) chapter,
 also see the topic hierarchy example in the [examples](examples.md).
diff --git a/examples/features/standard/topic-hierarchies/pom.xml b/examples/features/standard/topic-hierarchies/pom.xml
index 0079923..f830461 100644
--- a/examples/features/standard/topic-hierarchies/pom.xml
+++ b/examples/features/standard/topic-hierarchies/pom.xml
@@ -56,10 +56,7 @@ under the License.
                   </goals>
                   <configuration>
                      <ignore>${noServer}</ignore>
-                     <args>
-                        <arg>--addresses</arg>
-                        <arg>news,news.usa,news.usa.wrestling,news.europe,news.europe.sport,news.europe.entertainment</arg>
-                     </args>
+                     <configuration>${basedir}/target/classes/activemq/server0</configuration>
                   </configuration>
                </execution>
                <execution>
diff --git a/examples/features/standard/topic-hierarchies/readme.md b/examples/features/standard/topic-hierarchies/readme.md
index 2f70bc7..9f3adc0 100644
--- a/examples/features/standard/topic-hierarchies/readme.md
+++ b/examples/features/standard/topic-hierarchies/readme.md
@@ -8,4 +8,14 @@ ActiveMQ Artemis wild-cards can use the character `#` which means "match any num
 
 For example if I subscribe using the wild-card `news.europe.#`, then that would match messages sent to the addresses `news.europe`, `news.europe.sport` and `news.europe.entertainment`, but it does not match messages sent to the address `news.usa.wrestling`.
 
+Note that wildcard subscribers need some explicit configuration with respect to paging. The entire hierarchy needs to page to a single address such that subscribers don't race to store and account for individual messages.
+
+Notice the address setting in broker.xml that configures matching address (the root of the hierarchy) to use the shared "news-wildcard" page store.
+
+```xml
+         <address-setting match="news.#">
+            <page-store-name>news-wildcard</page-store-name>
+         </address-setting>
+```
+
 For more information on the wild-card syntax please consult the user manual.
\ No newline at end of file
diff --git a/examples/features/standard/topic-hierarchies/src/main/resources/activemq/server0/broker.xml b/examples/features/standard/topic-hierarchies/src/main/resources/activemq/server0/broker.xml
new file mode 100644
index 0000000..32e6934
--- /dev/null
+++ b/examples/features/standard/topic-hierarchies/src/main/resources/activemq/server0/broker.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+   <core xmlns="urn:activemq:core">
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      <large-messages-directory>./data/largemessages</large-messages-directory>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <!-- Acceptors -->
+      <acceptors>
+         <acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
+      </acceptors>
+
+      <!-- Other config -->
+
+      <security-settings>
+         <!--security for example queue-->
+         <security-setting match="#">
+            <permission roles="guest" type="createDurableQueue"/>
+            <permission roles="guest" type="deleteDurableQueue"/>
+            <permission roles="guest" type="createNonDurableQueue"/>
+            <permission roles="guest" type="deleteNonDurableQueue"/>
+            <permission roles="guest" type="createAddress"/>
+            <permission roles="guest" type="deleteAddress"/>
+            <permission roles="guest" type="consume"/>
+            <permission roles="guest" type="send"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- ensure that all addresses in the topic hierarchy reference a single shared page store -->
+         <address-setting match="news.#">
+            <page-store-name>news-wildcard</page-store-name>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="news"/>
+         <address name="news.usa"/>
+         <address name="news.usa.wrestling"/>
+         <address name="news.europe"/>
+         <address name="news.europe.sport"/>
+         <address name="news.europe.entertainment"/>
+      </addresses>
+   </core>
+</configuration>
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/WildcardTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/WildcardTest.java
new file mode 100644
index 0000000..08b37d5
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/WildcardTest.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.management.ObjectName;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(Parameterized.class)
+public class WildcardTest extends JMSTestBase {
+
+   @Parameters(name = "a={0},b={1},c={2}")
+   public static Iterable<Object[]> data() {
+      return Arrays.asList(new Object[][] {{"test.topic.A", "test.topic.B", "test.topic.#"},
+         {"test.topic.A", "test.topic.B", "test.#"}});
+   }
+
+   private String topicA;
+   private String topicB;
+   private String topicWildcard;
+
+   @Override
+   protected Configuration createDefaultConfig(boolean netty) throws Exception {
+      Configuration configuration =  super.createDefaultConfig(netty).setJMXManagementEnabled(true);
+      configuration.getAddressesSettings().put("test.#", new AddressSettings().setPageStoreName(new SimpleString("test-topic-hierarchy-root")));
+      return configuration;
+   }
+
+   public WildcardTest(String topicA, String topicB, String topicWildcard) {
+      super();
+
+      this.topicA = topicA;
+      this.topicB = topicB;
+      this.topicWildcard = topicWildcard;
+   }
+
+   @Test
+   public void testWildcard1Topic() throws Exception {
+      Session         sessionA   = createSession();
+      MessageProducer producerA  = createProducer(sessionA, topicA);
+
+      MessageConsumer consumerA  = createConsumer(topicA);
+      MessageConsumer consumerWC = createConsumer(topicWildcard);
+
+      Message message = sessionA.createObjectMessage(1);
+      producerA.send(message);
+
+      ObjectMessage received1 = (ObjectMessage)consumerA.receive(500);
+      Assert.assertNotNull(received1);
+      Assert.assertNotNull(received1.getObject());
+
+      ObjectMessage received2 = (ObjectMessage)consumerWC.receive(500);
+      Assert.assertNotNull(received2);
+      Assert.assertNotNull(received2.getObject());
+
+      Assert.assertEquals(received1.getJMSMessageID(), received2.getJMSMessageID());
+      Assert.assertEquals(received1.getObject(), received2.getObject());
+   }
+
+   @Test
+   public void testWildcard2Topics() throws Exception {
+      Session         sessionA   = createSession();
+      MessageProducer producerA  = createProducer(sessionA, topicA);
+
+      Session         sessionB   = createSession();
+      MessageProducer producerB  = createProducer(sessionA, topicB);
+
+      MessageConsumer consumerA  = createConsumer(topicA);
+      MessageConsumer consumerB  = createConsumer(topicB);
+      MessageConsumer consumerWC = createConsumer(topicWildcard);
+
+      Message message1 = sessionA.createObjectMessage(1);
+      producerA.send(message1);
+
+      Message message2 = sessionB.createObjectMessage(2);
+      producerB.send(message2);
+
+      ObjectMessage received1 = (ObjectMessage)consumerA.receive(500);
+      Assert.assertNotNull(received1);
+      Assert.assertNotNull(received1.getObject());
+
+      ObjectMessage received2 = (ObjectMessage)consumerB.receive(500);
+      Assert.assertNotNull(received2);
+      Assert.assertNotNull(received2.getObject());
+
+      ObjectMessage received3 = (ObjectMessage)consumerWC.receive(500);
+      Assert.assertNotNull(received3);
+      Assert.assertNotNull(received3.getObject());
+
+      ObjectMessage received4 = (ObjectMessage)consumerWC.receive(500);
+      Assert.assertNotNull(received4);
+      Assert.assertNotNull(received4.getObject());
+
+      Assert.assertEquals(received1.getJMSMessageID(), received3.getJMSMessageID());
+      Assert.assertEquals(received1.getObject(), received3.getObject());
+
+      Assert.assertEquals(received2.getJMSMessageID(), received4.getJMSMessageID());
+      Assert.assertEquals(received2.getObject(), received4.getObject());
+   }
+
+   @Test
+   public void testNegativeAddressSizeOnWildcard1() throws Exception {
+      testNegativeAddressSizeOnWildcard(1);
+   }
+
+   @Test
+   public void testNegativeAddressSizeOnWildcard2() throws Exception {
+      testNegativeAddressSizeOnWildcard(2);
+   }
+
+   @Test
+   public void testNegativeAddressSizeOnWildcard10() throws Exception {
+      testNegativeAddressSizeOnWildcard(10);
+   }
+
+   @Test
+   public void testNegativeAddressSizeOnWildcard100() throws Exception {
+      testNegativeAddressSizeOnWildcard(100);
+   }
+
+   @Test
+   public void testNegativeAddressSizeOnWildcardAsync1() throws Exception {
+      testNegativeAddressSizeOnWildcardAsync(1);
+   }
+
+   @Test
+   public void testNegativeAddressSizeOnWildcardAsync2() throws Exception {
+      testNegativeAddressSizeOnWildcardAsync(2);
+   }
+
+   @Test
+   public void testNegativeAddressSizeOnWildcardAsync10() throws Exception {
+      testNegativeAddressSizeOnWildcardAsync(10);
+   }
+
+   @Test
+   public void testNegativeAddressSizeOnWildcardAsync100() throws Exception {
+      testNegativeAddressSizeOnWildcardAsync(100);
+   }
+
+   private void testNegativeAddressSizeOnWildcard(int numMessages) throws Exception {
+      Session         sessionA   = createSession();
+      MessageProducer producerA  = createProducer(sessionA, topicA);
+
+      MessageConsumer consumerA  = createConsumer(topicA);
+      MessageConsumer consumerWC = createConsumer(topicWildcard);
+
+      for (int i = 0; i < numMessages; i++) {
+         Message message = sessionA.createObjectMessage(i);
+         producerA.send(message);
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         ObjectMessage received1 = (ObjectMessage)consumerA.receive(500);
+         Assert.assertNotNull("consumerA message - " + i + " is null", received1);
+         Assert.assertNotNull("consumerA message - " + i + " is null", received1.getObject());
+
+         ObjectMessage received2 = (ObjectMessage)consumerWC.receive(500);
+         Assert.assertNotNull("consumerWC message - " + i + " is null", received2);
+         Assert.assertNotNull("consumerWC message - " + i + " is null", received2.getObject());
+      }
+
+      long addressSizeA  = (Long)mbeanServer.getAttribute(new ObjectName("org.apache.activemq.artemis:broker=\"localhost\",component=addresses,address=\"" + topicA + "\""), "AddressSize");
+      long addressSizeWC = (Long)mbeanServer.getAttribute(new ObjectName("org.apache.activemq.artemis:broker=\"localhost\",component=addresses,address=\"" + topicWildcard + "\""), "AddressSize");
+
+      Assert.assertTrue(topicA + " AddressSize < 0", addressSizeA >= 0);
+      Assert.assertTrue(topicWildcard + " AddressSize < 0", addressSizeWC >= 0);
+   }
+
+   private void testNegativeAddressSizeOnWildcardAsync(int numMessages) throws Exception {
+      Session         sessionA   = createSession();
+      MessageProducer producerA  = createProducer(sessionA, topicA);
+
+      CountDownLatch  latchA    = new CountDownLatch(numMessages);
+      MessageConsumer consumerA = createAsyncConsumer(topicA, latchA);
+
+      CountDownLatch  latchWC    = new CountDownLatch(numMessages);
+      MessageConsumer consumerWC = createAsyncConsumer(topicWildcard, latchWC);
+
+      for (int i = 0; i < numMessages; i++) {
+         Message message = sessionA.createObjectMessage(i);
+
+         producerA.send(message);
+      }
+
+      if (!latchA.await(5, TimeUnit.SECONDS)) {
+         Assert.fail("Waiting to receive " + latchA.getCount() + " messages on " + topicA);
+      }
+
+      if (!latchWC.await(5, TimeUnit.SECONDS)) {
+         Assert.fail("Waiting to receive " + latchWC.getCount() + " messages on " + topicWildcard);
+      }
+
+      long addressSizeA  = (Long)mbeanServer.getAttribute(new ObjectName("org.apache.activemq.artemis:broker=\"localhost\",component=addresses,address=\"" + topicA + "\""), "AddressSize");
+      long addressSizeWC = (Long)mbeanServer.getAttribute(new ObjectName("org.apache.activemq.artemis:broker=\"localhost\",component=addresses,address=\"" + topicWildcard + "\""), "AddressSize");
+
+      Assert.assertTrue(topicA + " AddressSize < 0", addressSizeA >= 0);
+      Assert.assertTrue(topicWildcard + " AddressSize < 0", addressSizeWC >= 0);
+   }
+
+   private Session createSession() throws Exception {
+      Connection connection = createConnection();
+      Session    session    = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      return session;
+   }
+
+   private MessageProducer createProducer(Session session, String topicName) throws Exception {
+      Topic topic = session.createTopic(topicName);
+
+      MessageProducer producer = session.createProducer(topic);
+      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+      return producer;
+   }
+
+   private MessageConsumer createConsumer(String topicName) throws Exception {
+      Connection connection = createConnection();
+      connection.start();
+
+      Session    session    = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Topic      topic      = session.createTopic(topicName);
+
+      MessageConsumer consumer = session.createConsumer(topic, null, false);
+
+      return consumer;
+   }
+
+   private MessageConsumer createAsyncConsumer(String topicName, CountDownLatch latch) throws Exception {
+      MessageConsumer consumer = createConsumer(topicName);
+      consumer.setMessageListener(m -> {
+         try {
+            latch.countDown();
+         } catch (Throwable ex) {
+            ex.printStackTrace();
+         }
+      });
+
+      return consumer;
+   }
+}
\ No newline at end of file
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java
new file mode 100644
index 0000000..03a6629
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.mqtt;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.util.LinkedList;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.jgroups.util.UUID;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class MqttWildCardSubAutoCreateTest extends MQTTTestSupport {
+
+   private int lastId;
+   private MqttClient subscriber;
+   private MqttClient sender;
+   private volatile LinkedList<String> topics = new LinkedList<>();
+
+   @After
+   public void clean() throws MqttException {
+      topics.clear();
+      if (subscriber != null && subscriber.isConnected()) {
+         subscriber.disconnect();
+         subscriber.close();
+      }
+      if (sender != null && sender.isConnected()) {
+         sender.disconnect();
+         sender.close();
+      }
+   }
+
+   @Override
+   protected ActiveMQServer createServer(final boolean realFiles, final Configuration configuration) {
+      configuration.getAddressesSettings().remove("#");
+      configuration.getAddressesSettings().put("#", new AddressSettings().setPageSizeBytes(5).setMaxSizeBytes(10).setPageStoreName(new SimpleString("news-bag")));
+      configuration.setGlobalMaxSize(15);
+      return createServer(realFiles, configuration, AddressSettings.DEFAULT_PAGE_SIZE, 10);
+   }
+
+   @Test
+   public void testWildcardSubAutoCreateDoesNotPageToWildcardAddress() throws Exception {
+
+      server.getManagementService().enableNotifications(false);
+
+      String subscriberId = UUID.randomUUID().toString();
+      String senderId = UUID.randomUUID().toString();
+      String subscribeTo = "A.*";
+      String publishTo = "A.a";
+
+      subscriber = createMqttClient(subscriberId);
+      subscriber.subscribe(subscribeTo, 2);
+
+      subscriber.disconnect();
+
+      sender = createMqttClient(senderId);
+      sender.publish(publishTo, UUID.randomUUID().toString().getBytes(), 2, false);
+      sender.publish(publishTo, UUID.randomUUID().toString().getBytes(), 2, false);
+
+      assertTrue(server.getPagingManager().getPageStore(new SimpleString(subscribeTo)).isPaging());
+
+      subscriber = createMqttClient(subscriberId);
+      subscriber.subscribe(subscribeTo, 2);
+
+      boolean satisfied = Wait.waitFor(() -> topics.size() == 2, 5_000);
+      if (!satisfied) {
+         Assert.fail();
+      }
+
+      subscriber.messageArrivedComplete(lastId, 2);
+      subscriber.disconnect();
+      subscriber.close();
+
+      for (String topic : topics) {
+         assertEquals("A/a", topic);
+      }
+
+   }
+
+   private MqttClient createMqttClient(String clientId) throws MqttException {
+      MqttClient client = new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence());
+      client.setCallback(createCallback());
+      client.setManualAcks(true);
+      MqttConnectOptions options = new MqttConnectOptions();
+      options.setCleanSession(false);
+      client.connect(options);
+      return client;
+   }
+
+   private MqttCallback createCallback() {
+      return new MqttCallback() {
+
+         @Override
+         public void messageArrived(String topic, MqttMessage message) throws Exception {
+            topics.add(topic);
+            lastId = message.getId();
+         }
+
+         @Override
+         public void deliveryComplete(IMqttDeliveryToken token) {
+         }
+
+         @Override
+         public void connectionLost(Throwable cause) {
+         }
+      };
+   }
+
+   @Test
+   public void testCoreHierarchicalTopic() throws Exception {
+      ConnectionFactory cf = new ActiveMQConnectionFactory();
+
+      Connection connection = cf.createConnection();
+      connection.setClientID("CLI-ID");
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      Topic topicSubscribe = ActiveMQJMSClient.createTopic("news.europe.#");
+
+      MessageConsumer messageConsumer = session.createDurableConsumer(topicSubscribe, "news-eu");
+
+      MessageProducer producer = session.createProducer(null);
+
+      Topic topicNewsUsaWrestling = ActiveMQJMSClient.createTopic("news.usa.wrestling");
+      Topic topicNewsEuropeSport = ActiveMQJMSClient.createTopic("news.europe.sport");
+      Topic topicNewsEuropeEntertainment = ActiveMQJMSClient.createTopic("news.europe.entertainment");
+
+      TextMessage messageWrestlingNews = session.createTextMessage("Hulk Hogan starts ballet classes");
+      addSizeProp(messageWrestlingNews);
+      producer.send(topicNewsUsaWrestling, messageWrestlingNews);
+
+      TextMessage messageEuropeSport = session.createTextMessage("Lewis Hamilton joins European synchronized swimming team");
+      producer.send(topicNewsEuropeSport, messageEuropeSport);
+
+      TextMessage messageEuropeEntertainment = session.createTextMessage("John Lennon resurrected from dead");
+      producer.send(topicNewsEuropeEntertainment, messageEuropeEntertainment);
+
+      connection.start();
+
+      // second consumer to page to different address
+      Topic topicSubscribeAllNews = ActiveMQJMSClient.createTopic("news.#");
+
+      MessageConsumer messageConsumerAllNews = session.createDurableConsumer(topicSubscribeAllNews, "news-all");
+
+      producer.send(topicNewsUsaWrestling, messageWrestlingNews);
+      producer.send(topicNewsEuropeEntertainment, messageEuropeEntertainment);
+
+      MessageConsumer messageConsumerEuEnt = session.createDurableConsumer(topicNewsEuropeEntertainment, "news-eu-ent");
+
+      producer.send(topicNewsUsaWrestling, messageWrestlingNews);
+      producer.send(topicNewsEuropeEntertainment, messageEuropeEntertainment);
+
+      System.out.println("Usage " + server.getPagingManager().getGlobalSize());
+
+      TextMessage msg = (TextMessage) messageConsumerAllNews.receive(5000);
+
+      System.out.println("1 All received message: " + msg.getText() + ", dest: " + msg.getJMSDestination());
+
+      msg = (TextMessage) messageConsumerAllNews.receive(5000);
+
+      System.out.println("2 All received message: " + msg.getText() + ", dest: " + msg.getJMSDestination());
+
+      msg = (TextMessage) messageConsumerEuEnt.receive(5000);
+
+      System.out.println("3 EuEnt received message: " + msg.getText() + ", dest: " + msg.getJMSDestination());
+
+      TextMessage messageReceived1 = (TextMessage) messageConsumer.receive(5000);
+
+      System.out.println("4 Received message: " + messageReceived1.getText() + ", dest: " + messageReceived1.getJMSDestination());
+
+      TextMessage messageReceived2 = (TextMessage) messageConsumer.receive(5000);
+
+      System.out.println("5 Received message: " + messageReceived2.getText() + ", dest: " + messageReceived2.getJMSDestination());
+
+      // verify messageConsumer gets messageEuropeEntertainment
+      msg = (TextMessage) messageConsumer.receive(5000);
+
+      System.out.println("6 Eu received message: " + msg.getText() + ", dest: " + msg.getJMSDestination());
+
+      assertEquals(topicNewsEuropeSport, messageReceived1.getJMSDestination());
+      assertEquals(topicNewsEuropeEntertainment, messageReceived2.getJMSDestination());
+      assertEquals(topicNewsEuropeEntertainment, msg.getJMSDestination());
+
+      messageConsumer.close();
+      messageConsumerAllNews.close();
+
+      int countOfPageStores = server.getPagingManager().getStoreNames().length;
+      assertEquals("there should only be one", 1, countOfPageStores);
+
+      connection.close();
+   }
+
+   private void addSizeProp(TextMessage messageWrestlingNews) throws JMSException {
+      messageWrestlingNews.setStringProperty("stuff", new String(new byte[1024]));
+   }
+
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
index 3066511..8972dae 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
@@ -233,7 +233,7 @@ public class MQTTTestSupport extends ActiveMQTestBase {
       TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
       server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
 
-      log.debugv("Added connector {} to broker", getProtocolScheme());
+      log.debug("Added CORE connector to broker");
    }
 
    protected void addMQTTConnector() throws Exception {
@@ -243,7 +243,7 @@ public class MQTTTestSupport extends ActiveMQTestBase {
 
       server.getConfiguration().addAcceptorConfiguration("MQTT", "tcp://localhost:" + port + "?protocols=MQTT;anycastPrefix=anycast:;multicastPrefix=multicast:");
 
-      log.debugv("Added connector {} to broker", getProtocolScheme());
+      log.debug("Added MQTT connector to broker");
    }
 
    public void stopBroker() throws Exception {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSizeWildcardTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSizeWildcardTest.java
new file mode 100644
index 0000000..a1e64c6
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSizeWildcardTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import java.util.Date;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.api.jms.JMSFactoryType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Test;
+
+public class PagingSizeWildcardTest extends ActiveMQTestBase {
+
+   @Test
+   public void testWildcardPageSize() throws Exception {
+
+      Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+      config.getAddressesSettings().put("A.#", new AddressSettings().setPageStoreName(new SimpleString("shared-page-store-for-a#")));
+
+      ActiveMQServer server = createServer(true, config, 200, 400);
+      server.start();
+
+      ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+      try {
+         Connection conn = cf.createConnection();
+         conn.start();
+         Session sessA = conn.createSession(true, Session.SESSION_TRANSACTED);
+         Topic subA = sessA.createTopic("A.a");
+         MessageConsumer consumerA = sessA.createConsumer(subA);
+
+         Session sessW = conn.createSession(true, Session.SESSION_TRANSACTED);
+         Topic subW = sessA.createTopic("A.#");
+         MessageConsumer consumerW = sessW.createConsumer(subW);
+
+         final int numMessages = 5;
+         publish(cf, numMessages);
+
+         for (int i = 0; i < numMessages; i++) {
+            assertNotNull(" on " +  i, consumerA.receive(1000));
+            assertNotNull(" on " +  i, consumerW.receive(1000));
+         }
+
+         // commit in reverse order to dispatch
+         sessW.commit();
+         sessA.commit();
+
+         for (SimpleString psName : server.getPagingManager().getStoreNames()) {
+            assertTrue("non negative size: " + psName, server.getPagingManager().getPageStore(psName).getAddressSize() >= 0);
+         }
+         conn.close();
+
+      } finally {
+         server.stop();
+      }
+   }
+
+
+   @Test
+   public void testDurableSubReveresOrderAckPageSize() throws Exception {
+
+      Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+      ActiveMQServer server = createServer(true, config, 200, 400);
+      server.start();
+
+      ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+      try {
+         Connection conn = cf.createConnection();
+         conn.setClientID("IDD");
+         conn.start();
+
+         Session sessA = conn.createSession(true, Session.SESSION_TRANSACTED);
+         Topic topic = sessA.createTopic("A.a");
+         MessageConsumer consumerA = sessA.createDurableConsumer(topic, "1");
+
+         Session sessW = conn.createSession(true, Session.SESSION_TRANSACTED);
+         MessageConsumer consumerW = sessW.createDurableConsumer(topic, "2");
+
+         final int numMessages = 5;
+         publish(cf, numMessages);
+
+         for (int i = 0; i < numMessages; i++) {
+            assertNotNull(" on " +  i, consumerA.receive(1000));
+            assertNotNull(" on " +  i, consumerW.receive(1000));
+         }
+
+         // commit in reverse order to dispatch
+         sessW.commit();
+         sessA.commit();
+
+         for (SimpleString psName : server.getPagingManager().getStoreNames()) {
+            assertTrue("non negative size: " + psName, server.getPagingManager().getPageStore(psName).getAddressSize() >= 0);
+         }
+         conn.close();
+
+      } finally {
+         server.stop();
+      }
+   }
+
+   private void publish(ActiveMQJMSConnectionFactory cf, int numMessages) throws Exception {
+      Connection conn = cf.createConnection();
+      conn.start();
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Topic subA = sess.createTopic("A.a");
+      MessageProducer messageProducer = sess.createProducer(subA);
+
+      for (int i = 0; i < numMessages; i++) {
+         messageProducer.send(sess.createTextMessage(new Date().toString()));
+      }
+      conn.close();
+   }
+
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index bbf96ab..644adff 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -6592,6 +6592,92 @@ public class PagingTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testHierarchicalPagingStoreNotDestroyed() throws Exception {
+      clearDataRecreateServerDirs();
+
+      final SimpleString pageAddress = new SimpleString("A.#");
+      Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+      config.getAddressesSettings().put("A.#", new AddressSettings().setPageStoreName(pageAddress));
+
+      server = createServer(true, config, 100, 500);
+
+      server.start();
+
+      final int numberOfMessages = 10;
+      final int messageSize = 100;
+
+      locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+      sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, false, false);
+
+      final SimpleString addressA = new SimpleString("A.a.#");
+      session.createQueue(new QueueConfiguration(addressA));
+
+      final SimpleString addressB = new SimpleString("A.b.#");
+      session.createQueue(new QueueConfiguration(addressB));
+
+      final SimpleString produceAddressA = new SimpleString("A.a.a");
+      ClientProducer producerA = session.createProducer(produceAddressA);
+
+      final SimpleString produceAddressB = new SimpleString("A.b.a");
+      ClientProducer producerB = session.createProducer(produceAddressB);
+
+      ClientMessage message = null;
+
+      byte[] body = new byte[messageSize];
+
+      ByteBuffer bb = ByteBuffer.wrap(body);
+
+      for (int j = 1; j <= messageSize; j++) {
+         bb.put(getSamplebyte(j));
+      }
+
+      for (int i = 0; i < numberOfMessages; i++) {
+         message = session.createMessage(true);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(body);
+
+         producerA.send(message);
+         producerB.send(message);
+         session.commit();
+      }
+      session.commit();
+      producerA.close();
+      producerB.close();
+
+      assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(pageAddress));
+      assertTrue(server.getPagingManager().getPageStore(pageAddress).isPaging());
+
+      session.deleteQueue(addressA);
+      session.deleteQueue(addressB);
+
+      session.close();
+
+      System.err.println("storeNames: " + Arrays.asList(server.getPagingManager().getStoreNames()));
+
+      server.getPagingManager().deletePageStore(produceAddressA);
+      server.getPagingManager().deletePageStore(produceAddressB);
+
+      sf.close();
+      locator.close();
+      locator = null;
+      sf = null;
+      assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(pageAddress));
+      // Ensure wildcard store is still there
+      server.getPagingManager().reloadStores();
+      assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(pageAddress));
+      server.stop();
+
+      server.start();
+      assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(pageAddress));
+      server.stop();
+   }
+
+   @Test
    public void testStopPagingWithoutConsumersIfTwoPages() throws Exception {
       testStopPagingWithoutConsumersOnOneQueue(true);
    }


Mime
View raw message