activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [3/4] activemq-artemis git commit: Add 'routing-type' to divert
Date Fri, 02 Dec 2016 04:44:16 GMT
Add 'routing-type' to divert


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/77684850
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/77684850
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/77684850

Branch: refs/heads/ARTEMIS-780
Commit: 77684850b19cf0e5c167381d58bea7a9d04dfbff
Parents: f575900
Author: jbertram <jbertram@apache.com>
Authored: Thu Dec 1 22:30:37 2016 -0600
Committer: jbertram <jbertram@apache.com>
Committed: Thu Dec 1 22:43:55 2016 -0600

----------------------------------------------------------------------
 .../config/ActiveMQDefaultConfiguration.java    |  10 ++
 .../core/management/ActiveMQServerControl.java  |  10 ++
 .../api/core/management/DivertControl.java      |   6 +
 .../artemis/core/server/RoutingType.java        |  10 +-
 .../core/config/DivertConfiguration.java        |  21 +++
 .../artemis/core/config/impl/Validators.java    |  14 ++
 .../deployers/impl/FileConfigurationParser.java |   4 +-
 .../impl/ActiveMQServerControlImpl.java         |  14 +-
 .../core/management/impl/DivertControlImpl.java |  10 ++
 .../core/server/ActiveMQMessageBundle.java      |   3 +
 .../core/server/impl/ActiveMQServerImpl.java    |   2 +-
 .../artemis/core/server/impl/DivertImpl.java    |  23 +++-
 .../resources/schema/artemis-configuration.xsd  |  16 +++
 .../tests/integration/divert/DivertTest.java    | 136 +++++++++++++++++++
 .../ActiveMQServerControlUsingCoreTest.java     |  12 ++
 .../management/DivertControlUsingCoreTest.java  |   5 +
 16 files changed, 291 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index e75c663..b4518fe 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -360,6 +360,9 @@ public final class ActiveMQDefaultConfiguration {
    // whether this is an exclusive divert
    private static boolean DEFAULT_DIVERT_EXCLUSIVE = false;
 
+   // how the divert should handle the message's routing type
+   private static String DEFAULT_DIVERT_ROUTING_TYPE = RoutingType.STRIP.toString();
+
    // If true then the server will request a backup on another node
    private static boolean DEFAULT_HAPOLICY_REQUEST_BACKUP = false;
 
@@ -1007,6 +1010,13 @@ public final class ActiveMQDefaultConfiguration {
    }
 
    /**
+    * how the divert should handle the message's routing type
+    */
+   public static String getDefaultDivertRoutingType() {
+      return DEFAULT_DIVERT_ROUTING_TYPE;
+   }
+
+   /**
     * If true then the server will request a backup on another node
     */
    public static boolean isDefaultHapolicyRequestBackup() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index cd257c6..1797c9a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -899,6 +899,16 @@ public interface ActiveMQServerControl {
                      @Parameter(name = "filterString", desc = "Filter of the divert") String
filterString,
                      @Parameter(name = "transformerClassName", desc = "Class name of the
divert's transformer") String transformerClassName) throws Exception;
 
+   @Operation(desc = "Create a Divert", impact = MBeanOperationInfo.ACTION)
+   void createDivert(@Parameter(name = "name", desc = "Name of the divert") String name,
+                     @Parameter(name = "routingName", desc = "Routing name of the divert")
String routingName,
+                     @Parameter(name = "address", desc = "Address to divert from") String
address,
+                     @Parameter(name = "forwardingAddress", desc = "Address to divert to")
String forwardingAddress,
+                     @Parameter(name = "exclusive", desc = "Is the divert exclusive?") boolean
exclusive,
+                     @Parameter(name = "filterString", desc = "Filter of the divert") String
filterString,
+                     @Parameter(name = "transformerClassName", desc = "Class name of the
divert's transformer") String transformerClassName,
+                     @Parameter(name = "routingType", desc = "How should the routing-type
on the diverted messages be set?") String routingType) throws Exception;
+
    @Operation(desc = "Destroy a Divert", impact = MBeanOperationInfo.ACTION)
    void destroyDivert(@Parameter(name = "name", desc = "Name of the divert") String name)
throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/DivertControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/DivertControl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/DivertControl.java
index c99646b..7c103ca 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/DivertControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/DivertControl.java
@@ -65,4 +65,10 @@ public interface DivertControl {
     */
    @Attribute(desc = "name of the org.apache.activemq.artemis.core.server.cluster.Transformer
implementation associated with this divert")
    String getTransformerClassName();
+
+   /**
+    * Returns the routing type used by this divert.
+    */
+   @Attribute(desc = "routing type used by this divert")
+   String getRoutingType();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java
index 2f17335..c9b1d09 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java
@@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.server;
 
 public enum RoutingType {
 
-   MULTICAST, ANYCAST;
+   MULTICAST, ANYCAST, STRIP, PASS;
 
    public byte getType() {
       switch (this) {
@@ -26,6 +26,10 @@ public enum RoutingType {
             return 0;
          case ANYCAST:
             return 1;
+         case STRIP:
+            return 2;
+         case PASS:
+            return 3;
          default:
             return -1;
       }
@@ -37,6 +41,10 @@ public enum RoutingType {
             return MULTICAST;
          case 1:
             return ANYCAST;
+         case 2:
+            return STRIP;
+         case 3:
+            return PASS;
          default:
             return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java
index a769f17..5326c72 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.config;
 import java.io.Serializable;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 
 public class DivertConfiguration implements Serializable {
@@ -39,6 +40,8 @@ public class DivertConfiguration implements Serializable {
 
    private String transformerClassName = null;
 
+   private RoutingType routingType = RoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType());
+
    public DivertConfiguration() {
    }
 
@@ -70,6 +73,10 @@ public class DivertConfiguration implements Serializable {
       return transformerClassName;
    }
 
+   public RoutingType getRoutingType() {
+      return routingType;
+   }
+
    /**
     * @param name the name to set
     */
@@ -130,6 +137,14 @@ public class DivertConfiguration implements Serializable {
       return this;
    }
 
+   /**
+    * @param routingType the routingType to set
+    */
+   public DivertConfiguration setRoutingType(final RoutingType routingType) {
+      this.routingType = routingType;
+      return this;
+   }
+
    @Override
    public int hashCode() {
       final int prime = 31;
@@ -141,6 +156,7 @@ public class DivertConfiguration implements Serializable {
       result = prime * result + ((name == null) ? 0 : name.hashCode());
       result = prime * result + ((routingName == null) ? 0 : routingName.hashCode());
       result = prime * result + ((transformerClassName == null) ? 0 : transformerClassName.hashCode());
+      result = prime * result + ((routingType == null) ? 0 : routingType.hashCode());
       return result;
    }
 
@@ -185,6 +201,11 @@ public class DivertConfiguration implements Serializable {
             return false;
       } else if (!transformerClassName.equals(other.transformerClassName))
          return false;
+      if (routingType == null) {
+         if (other.routingType != null)
+            return false;
+      } else if (!routingType.equals(other.routingType))
+         return false;
       return true;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
index bc57978..3e9bb4c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.config.impl;
 
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
@@ -164,6 +165,19 @@ public final class Validators {
       }
    };
 
+   public static final Validator ROUTING_TYPE = new Validator() {
+      @Override
+      public void validate(final String name, final Object value) {
+         String val = (String) value;
+         if (val == null || !val.equals(RoutingType.ANYCAST.toString()) &&
+            !val.equals(RoutingType.MULTICAST.toString()) &&
+            !val.equals(RoutingType.PASS.toString()) &&
+            !val.equals(RoutingType.STRIP.toString())) {
+            throw ActiveMQMessageBundle.BUNDLE.invalidRoutingType(val);
+         }
+      }
+   };
+
    public static final Validator MAX_QUEUE_CONSUMERS = new Validator() {
       @Override
       public void validate(String name, Object value) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 44d1a07..7b98602 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -1554,6 +1554,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
{
 
       String transformerClassName = getString(e, "transformer-class-name", null, Validators.NO_CHECK);
 
+      RoutingType routingType = RoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(),
Validators.ROUTING_TYPE));
+
       String filterString = null;
 
       NodeList children = e.getChildNodes();
@@ -1566,7 +1568,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
{
          }
       }
 
-      DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName);
+      DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName).setRoutingType(routingType);
 
       mainConfig.getDivertConfigurations().add(config);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 82f3943..4464062 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -1895,11 +1895,23 @@ public class ActiveMQServerControlImpl extends AbstractControl implements
Active
                             final boolean exclusive,
                             final String filterString,
                             final String transformerClassName) throws Exception {
+      createDivert(name, routingName, address, forwardingAddress, exclusive, filterString,
transformerClassName, ActiveMQDefaultConfiguration.getDefaultDivertRoutingType());
+   }
+
+   @Override
+   public void createDivert(final String name,
+                            final String routingName,
+                            final String address,
+                            final String forwardingAddress,
+                            final boolean exclusive,
+                            final String filterString,
+                            final String transformerClassName,
+                            final String routingType) throws Exception {
       checkStarted();
 
       clearIO();
       try {
-         DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName);
+         DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName).setRoutingType(RoutingType.valueOf(routingType));
          server.deployDivert(config);
       } finally {
          blockOnIO();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/DivertControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/DivertControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/DivertControlImpl.java
index 6c47778..e87e333 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/DivertControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/DivertControlImpl.java
@@ -99,6 +99,16 @@ public class DivertControlImpl extends AbstractControl implements DivertControl
    }
 
    @Override
+   public String getRoutingType() {
+      clearIO();
+      try {
+         return configuration.getRoutingType().toString();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
    public String getUniqueName() {
       clearIO();
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 1c20ba5..ee8f0ef 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -410,4 +410,7 @@ public interface ActiveMQMessageBundle {
    IllegalArgumentException invalidRoutingTypeForAddress(RoutingType routingType,
                                                          String address,
                                                          Set<RoutingType> supportedRoutingTypes);
+
+   @Message(id = 119208, value = "Invalid routing type {0}", format = Message.Format.MESSAGE_FORMAT)
+   IllegalArgumentException invalidRoutingType(String val);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index aadcba9..aebcb9a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1802,7 +1802,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       Filter filter = FilterImpl.createFilter(config.getFilterString());
 
-      Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()), sName,
new SimpleString(config.getRoutingName()), config.isExclusive(), filter, transformer, postOffice,
storageManager);
+      Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()), sName,
new SimpleString(config.getRoutingName()), config.isExclusive(), filter, transformer, postOffice,
storageManager, config.getRoutingType());
 
       Binding binding = new DivertBinding(storageManager.generateID(), sAddress, divert);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index 5782379..fd55521 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -16,12 +16,14 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.Transformer;
 import org.jboss.logging.Logger;
@@ -49,6 +51,8 @@ public class DivertImpl implements Divert {
 
    private final StorageManager storageManager;
 
+   private final RoutingType routingType;
+
    public DivertImpl(final SimpleString forwardAddress,
                      final SimpleString uniqueName,
                      final SimpleString routingName,
@@ -56,7 +60,8 @@ public class DivertImpl implements Divert {
                      final Filter filter,
                      final Transformer transformer,
                      final PostOffice postOffice,
-                     final StorageManager storageManager) {
+                     final StorageManager storageManager,
+                     final RoutingType routingType) {
       this.forwardAddress = forwardAddress;
 
       this.uniqueName = uniqueName;
@@ -72,6 +77,8 @@ public class DivertImpl implements Divert {
       this.postOffice = postOffice;
 
       this.storageManager = storageManager;
+
+      this.routingType = routingType;
    }
 
    @Override
@@ -97,6 +104,20 @@ public class DivertImpl implements Divert {
 
          copy.setExpiration(message.getExpiration());
 
+         switch (routingType) {
+            case ANYCAST:
+               copy.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
+               break;
+            case MULTICAST:
+               copy.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
+               break;
+            case STRIP:
+               copy.removeProperty(Message.HDR_ROUTING_TYPE);
+               break;
+            case PASS:
+               break;
+         }
+
          if (transformer != null) {
             copy = transformer.transform(copy);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 7069c09..c9d1f5b 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -1568,6 +1568,22 @@
          </xsd:element>
 
          <xsd:element ref="filter" maxOccurs="1" minOccurs="0"/>
+
+         <xsd:element name="routing-type" default="STRIP" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  how should the routing-type on the diverted messages be set?
+               </xsd:documentation>
+            </xsd:annotation>
+            <xsd:simpleType>
+               <xsd:restriction base="xsd:string">
+                  <xsd:enumeration value="ANYCAST"/>
+                  <xsd:enumeration value="MULTICAST"/>
+                  <xsd:enumeration value="STRIP"/>
+                  <xsd:enumeration value="PASS"/>
+               </xsd:restriction>
+            </xsd:simpleType>
+         </xsd:element>
       </xsd:all>
 
       <xsd:attribute name="name" type="xsd:ID" use="required">

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
index a9501d8..8774088 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
@@ -122,6 +122,142 @@ public class DivertTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testSingleNonExclusiveDivertWithRoutingType() throws Exception {
+      final String testAddress = "testAddress";
+
+      final String forwardAddress = "forwardAddress";
+
+      DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress);
+
+      Configuration config = createDefaultInVMConfig().addDivertConfiguration(divertConf);
+
+      ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false));
+
+      server.start();
+
+      ServerLocator locator = createInVMNonHALocator();
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      final SimpleString queueName1 = new SimpleString("queue1");
+
+      final SimpleString queueName2 = new SimpleString("queue2");
+
+      session.createQueue(new SimpleString(forwardAddress), RoutingType.ANYCAST, queueName1,
null, false);
+
+      session.createQueue(new SimpleString(testAddress), RoutingType.MULTICAST, queueName2,
null, false);
+
+      session.start();
+
+      ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+
+      ClientConsumer consumer1 = session.createConsumer(queueName1);
+
+      ClientConsumer consumer2 = session.createConsumer(queueName2);
+
+      final int numMessages = 1;
+
+      final SimpleString propKey = new SimpleString("testkey");
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = session.createMessage(false);
+
+         message.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
+
+         message.putIntProperty(propKey, i);
+
+         producer.send(message);
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = consumer1.receive(DivertTest.TIMEOUT);
+
+         Assert.assertNotNull(message);
+
+         Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+         message.acknowledge();
+      }
+
+      Assert.assertNull(consumer1.receiveImmediate());
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = consumer2.receive(DivertTest.TIMEOUT);
+
+         Assert.assertNotNull(message);
+
+         Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+         message.acknowledge();
+      }
+
+      Assert.assertNull(consumer2.receiveImmediate());
+   }
+
+   @Test
+   public void testSingleExclusiveDivertWithRoutingType() throws Exception {
+      final String testAddress = "testAddress";
+
+      final String forwardAddress = "forwardAddress";
+
+      DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress).setExclusive(true);
+
+      Configuration config = createDefaultInVMConfig().addDivertConfiguration(divertConf);
+
+      ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false));
+
+      server.start();
+
+      ServerLocator locator = createInVMNonHALocator();
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      final SimpleString queueName1 = new SimpleString("queue1");
+
+      final SimpleString queueName2 = new SimpleString("queue2");
+
+      session.createQueue(new SimpleString(forwardAddress), RoutingType.ANYCAST, queueName1,
null, false);
+
+      session.createQueue(new SimpleString(testAddress), RoutingType.MULTICAST, queueName2,
null, false);
+
+      session.start();
+
+      ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+
+      ClientConsumer consumer1 = session.createConsumer(queueName1);
+
+      final int numMessages = 1;
+
+      final SimpleString propKey = new SimpleString("testkey");
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = session.createMessage(false);
+
+         message.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
+
+         message.putIntProperty(propKey, i);
+
+         producer.send(message);
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = consumer1.receive(DivertTest.TIMEOUT);
+
+         Assert.assertNotNull(message);
+
+         Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+         message.acknowledge();
+      }
+
+      Assert.assertNull(consumer1.receiveImmediate());
+   }
+
+   @Test
    public void testSingleDivertWithExpiry() throws Exception {
       final String testAddress = "testAddress";
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index 985b495..280fdc4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -713,6 +713,18 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
          }
 
          @Override
+         public void createDivert(String name,
+                                  String routingName,
+                                  String address,
+                                  String forwardingAddress,
+                                  boolean exclusive,
+                                  String filterString,
+                                  String transformerClassName,
+                                  String routingType) throws Exception {
+            proxy.invokeOperation("createDivert", name, routingName, address, forwardingAddress,
exclusive, filterString, transformerClassName, routingType);
+         }
+
+         @Override
          public void destroyDivert(String name) throws Exception {
             proxy.invokeOperation("destroyDivert", name);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java
index 61ecda2..48528ce 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java
@@ -62,6 +62,11 @@ public class DivertControlUsingCoreTest extends DivertControlTest {
          }
 
          @Override
+         public String getRoutingType() {
+            return (String) proxy.retrieveAttributeValue("routingType");
+         }
+
+         @Override
          public String getUniqueName() {
             return (String) proxy.retrieveAttributeValue("uniqueName");
          }


Mime
View raw message