activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/3] activemq-artemis git commit: ARTEMIS-1446 - Support Transformer configuration by properties
Date Mon, 09 Oct 2017 21:30:39 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master c94ca2d43 -> 8886ec292


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4db8cd54/examples/features/standard/divert/src/main/resources/activemq/server0/broker.xml
----------------------------------------------------------------------
diff --git a/examples/features/standard/divert/src/main/resources/activemq/server0/broker.xml
b/examples/features/standard/divert/src/main/resources/activemq/server0/broker.xml
index 567bf77..5e36d6e 100644
--- a/examples/features/standard/divert/src/main/resources/activemq/server0/broker.xml
+++ b/examples/features/standard/divert/src/main/resources/activemq/server0/broker.xml
@@ -62,8 +62,9 @@ under the License.
             <address>priceUpdates</address>
             <forwarding-address>priceForwarding</forwarding-address>
             <filter string="office='New York'"/>
-            <transformer-class-name>org.apache.activemq.artemis.jms.example.AddForwardingTimeTransformer
-            </transformer-class-name>
+            <transformer>
+               <class-name>org.apache.activemq.artemis.jms.example.AddForwardingTimeTransformer</class-name>
+            </transformer>
             <exclusive>true</exclusive>
          </divert>
       </diverts>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4db8cd54/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index e6a028f..88dff9b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -51,6 +51,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.config.BridgeConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
+import org.apache.activemq.artemis.core.config.TransformerConfiguration;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
@@ -69,7 +70,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.cluster.Bridge;
-import org.apache.activemq.artemis.core.server.cluster.Transformer;
+import org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer;
+import org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
@@ -948,7 +950,7 @@ public class BridgeTest extends ActiveMQTestBase {
       internaltestWithTransformer(true);
    }
 
-   public void internaltestWithTransformer(final boolean useFiles) throws Exception {
+   private void internaltestWithTransformer(final boolean useFiles) throws Exception {
       Map<String, Object> server0Params = new HashMap<>();
       server0 = createClusteredServerWithParams(isNetty(), 0, false, server0Params);
 
@@ -971,7 +973,7 @@ public class BridgeTest extends ActiveMQTestBase {
       ArrayList<String> staticConnectors = new ArrayList<>();
       staticConnectors.add(server1tc.getName());
 
-      BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setTransformerClassName(SimpleTransformer.class.getName()).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1024).setStaticConnectors(staticConnectors);
+      BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setTransformerConfiguration(new
TransformerConfiguration().setClassName(SimpleTransformer.class.getName())).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1024).setStaticConnectors(staticConnectors);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
       bridgeConfigs.add(bridgeConfiguration);
@@ -1052,6 +1054,114 @@ public class BridgeTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testWithTransformerProperties() throws Exception {
+      final String propKey = "bridged";
+      final String propValue = "true";
+
+
+      TransformerConfiguration transformerConfiguration = new TransformerConfiguration().setClassName(AddHeadersTransformer.class.getName());
+      transformerConfiguration.getProperties().put(propKey, propValue);
+
+      Map<String, Object> server0Params = new HashMap<>();
+      server0 = createClusteredServerWithParams(isNetty(), 0, false, server0Params);
+
+      Map<String, Object> server1Params = new HashMap<>();
+      addTargetParameters(server1Params);
+      server1 = createClusteredServerWithParams(isNetty(), 1, false, server1Params);
+
+      final String testAddress = "testAddress";
+      final String queueName0 = "queue0";
+      final String forwardAddress = "forwardAddress";
+      final String queueName1 = "queue1";
+
+      Map<String, TransportConfiguration> connectors = new HashMap<>();
+      TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+      TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+      connectors.put(server1tc.getName(), server1tc);
+
+      server0.getConfiguration().setConnectorConfigurations(connectors);
+
+      ArrayList<String> staticConnectors = new ArrayList<>();
+      staticConnectors.add(server1tc.getName());
+
+      BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setTransformerConfiguration(transformerConfiguration).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1024).setStaticConnectors(staticConnectors);
+
+      List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
+      bridgeConfigs.add(bridgeConfiguration);
+      server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+      CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
+      List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
+      queueConfigs0.add(queueConfig0);
+      server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+      CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1);
+      List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<>();
+      queueConfigs1.add(queueConfig1);
+      server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+      server1.start();
+      server0.start();
+
+      locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
+      ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+      ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+      ClientSession session0 = sf0.createSession(false, true, true);
+
+      ClientSession session1 = sf1.createSession(false, true, true);
+
+      ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+      ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+      session1.start();
+
+      final int numMessages = 10;
+
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = session0.createMessage(true);
+
+         message.getBodyBuffer().writeString("doo be doo be doo be doo");
+
+         producer0.send(message);
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = consumer1.receive(200);
+
+         Assert.assertNotNull(message);
+
+         String messagePropVal = message.getStringProperty(propKey);
+
+         Assert.assertEquals(propValue, messagePropVal);
+
+         String sval = message.getBodyBuffer().readString();
+
+         Assert.assertEquals("doo be doo be doo be doo", sval);
+
+         message.acknowledge();
+
+      }
+
+      Assert.assertNull(consumer1.receiveImmediate());
+
+      session0.close();
+
+      session1.close();
+
+      sf0.close();
+
+      sf1.close();
+
+      if (server0.getConfiguration().isPersistenceEnabled()) {
+         assertEquals(0, loadQueues(server0).size());
+      }
+   }
+
+   @Test
    public void testSawtoothLoad() throws Exception {
       Map<String, Object> server0Params = new HashMap<>();
       ActiveMQServer server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4db8cd54/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
index 5311601..9f60b30 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
@@ -37,7 +37,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.api.core.RoutingType;
 
-import org.apache.activemq.artemis.core.server.cluster.Transformer;
+import org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4db8cd54/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index feec568..4a9f861 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -798,6 +798,32 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
          }
 
          @Override
+         public void createDivert(String name,
+                                  String routingName,
+                                  String address,
+                                  String forwardingAddress,
+                                  boolean exclusive,
+                                  String filterString,
+                                  String transformerClassName,
+                                  Map<String, String> transformerProperties,
+                                  String routingType) throws Exception {
+            proxy.invokeOperation("createDivert", name, routingName, address, forwardingAddress,
exclusive, filterString, transformerClassName, transformerProperties, routingType);
+         }
+
+         @Override
+         public void createDivert(String name,
+                                  String routingName,
+                                  String address,
+                                  String forwardingAddress,
+                                  boolean exclusive,
+                                  String filterString,
+                                  String transformerClassName,
+                                  String transformerPropertiesAsJSON,
+                                  String routingType) throws Exception {
+            proxy.invokeOperation("createDivert", name, routingName, address, forwardingAddress,
exclusive, filterString, transformerClassName, transformerPropertiesAsJSON, routingType);
+         }
+
+         @Override
          public void destroyDivert(String name) throws Exception {
             proxy.invokeOperation("destroyDivert", name);
          }
@@ -875,6 +901,52 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
                                   String forwardingAddress,
                                   String filterString,
                                   String transformerClassName,
+                                  Map<String, String> transformerProperties,
+                                  long retryInterval,
+                                  double retryIntervalMultiplier,
+                                  int initialConnectAttempts,
+                                  int reconnectAttempts,
+                                  boolean useDuplicateDetection,
+                                  int confirmationWindowSize,
+                                  int producerWindowSize,
+                                  long clientFailureCheckPeriod,
+                                  String connectorNames,
+                                  boolean useDiscovery,
+                                  boolean ha,
+                                  String user,
+                                  String password) throws Exception {
+            proxy.invokeOperation("createBridge", name, queueName, forwardingAddress, filterString,
transformerClassName, transformerProperties, retryInterval, retryIntervalMultiplier, initialConnectAttempts,
reconnectAttempts, useDuplicateDetection, confirmationWindowSize, producerWindowSize, clientFailureCheckPeriod,
connectorNames, useDiscovery, ha, user, password);
+         }
+
+         @Override
+         public void createBridge(String name,
+                                  String queueName,
+                                  String forwardingAddress,
+                                  String filterString,
+                                  String transformerClassName,
+                                  String transformerPropertiesAsJSON,
+                                  long retryInterval,
+                                  double retryIntervalMultiplier,
+                                  int initialConnectAttempts,
+                                  int reconnectAttempts,
+                                  boolean useDuplicateDetection,
+                                  int confirmationWindowSize,
+                                  int producerWindowSize,
+                                  long clientFailureCheckPeriod,
+                                  String connectorNames,
+                                  boolean useDiscovery,
+                                  boolean ha,
+                                  String user,
+                                  String password) throws Exception {
+            proxy.invokeOperation("createBridge", name, queueName, forwardingAddress, filterString,
transformerClassName, transformerPropertiesAsJSON, retryInterval, retryIntervalMultiplier,
initialConnectAttempts, reconnectAttempts, useDuplicateDetection, confirmationWindowSize,
producerWindowSize, clientFailureCheckPeriod, connectorNames, useDiscovery, ha, user, password);
+         }
+
+         @Override
+         public void createBridge(String name,
+                                  String queueName,
+                                  String forwardingAddress,
+                                  String filterString,
+                                  String transformerClassName,
                                   long retryInterval,
                                   double retryIntervalMultiplier,
                                   int initialConnectAttempts,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4db8cd54/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlTest.java
index 109e008..edd2667 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlTest.java
@@ -61,7 +61,9 @@ public class DivertControlTest extends ManagementTestBase {
 
       Assert.assertEquals(divertConfig.getForwardingAddress(), divertControl.getForwardingAddress());
 
-      Assert.assertEquals(divertConfig.getTransformerClassName(), divertControl.getTransformerClassName());
+      Assert.assertEquals(divertConfig.getTransformerConfiguration().getClassName(), divertControl.getTransformerClassName());
+
+      Assert.assertEquals(divertConfig.getTransformerConfiguration().getProperties(), divertControl.getTransformerProperties());
    }
 
    // Package protected ---------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4db8cd54/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java
index 48528ce..2705ab6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.tests.integration.management;
 
+import java.util.Map;
+
 import org.apache.activemq.artemis.api.core.management.DivertControl;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 
@@ -62,6 +64,16 @@ public class DivertControlUsingCoreTest extends DivertControlTest {
          }
 
          @Override
+         public String getTransformerPropertiesAsJSON() {
+            return (String) proxy.retrieveAttributeValue("transformerPropertiesAsJSON");
+         }
+
+         @Override
+         public Map<String, String> getTransformerProperties() {
+            return (Map<String, String>) proxy.retrieveAttributeValue("transformerProperties");
+         }
+
+         @Override
          public String getRoutingType() {
             return (String) proxy.retrieveAttributeValue("routingType");
          }


Mime
View raw message