activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-250 fix scale-down with security
Date Tue, 24 Nov 2015 02:40:00 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 527da42f1 -> 2eecfdc71


ARTEMIS-250 fix scale-down with security


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c0a6d6ee
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c0a6d6ee
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c0a6d6ee

Branch: refs/heads/master
Commit: c0a6d6ee44be2d47bdc07c2fcb90c66bcfe019e0
Parents: 527da42
Author: jbertram <jbertram@apache.org>
Authored: Fri Nov 20 14:44:45 2015 -0600
Committer: jbertram <jbertram@apache.org>
Committed: Fri Nov 20 19:34:37 2015 -0600

----------------------------------------------------------------------
 .../core/server/cluster/ClusterControl.java     |  8 ++++
 .../core/server/impl/ScaleDownHandler.java      | 24 ++++++----
 .../cluster/distribution/ClusterTestBase.java   | 32 +++++++++++--
 .../integration/server/ScaleDown3NodeTest.java  | 49 +++++++++++---------
 .../integration/server/ScaleDownDirectTest.java |  2 +-
 5 files changed, 78 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0a6d6ee/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java
index bfcff92..c213ff9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java
@@ -197,4 +197,12 @@ public class ClusterControl implements AutoCloseable {
       ScaleDownAnnounceMessage announceMessage = new ScaleDownAnnounceMessage(targetNodeId,
scaledDownNodeId);
       clusterChannel.send(announceMessage);
    }
+
+   public String getClusterUser() {
+      return clusterUser;
+   }
+
+   public String getClusterPassword() {
+      return clusterPassword;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0a6d6ee/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
index e335c50..48a56bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
@@ -91,19 +91,19 @@ public class ScaleDownHandler {
                          SimpleString targetNodeId) throws Exception {
       ClusterControl clusterControl = clusterController.connectToNodeInCluster((ClientSessionFactoryInternal)
sessionFactory);
       clusterControl.authorize();
-      long num = scaleDownMessages(sessionFactory, targetNodeId);
+      long num = scaleDownMessages(sessionFactory, targetNodeId, clusterControl.getClusterUser(),
clusterControl.getClusterPassword());
       ActiveMQServerLogger.LOGGER.info("Scaled down " + num + " messages total.");
-      scaleDownTransactions(sessionFactory, resourceManager);
-      scaleDownDuplicateIDs(duplicateIDMap, sessionFactory, managementAddress);
+      scaleDownTransactions(sessionFactory, resourceManager, clusterControl.getClusterUser(),
clusterControl.getClusterPassword());
+      scaleDownDuplicateIDs(duplicateIDMap, sessionFactory, managementAddress, clusterControl.getClusterUser(),
clusterControl.getClusterPassword());
       clusterControl.announceScaleDown(new SimpleString(this.targetNodeId), nodeManager.getNodeId());
       return num;
    }
 
-   public long scaleDownMessages(ClientSessionFactory sessionFactory, SimpleString nodeId)
throws Exception {
+   public long scaleDownMessages(ClientSessionFactory sessionFactory, SimpleString nodeId,
String user, String password) throws Exception {
       long messageCount = 0;
       targetNodeId = nodeId != null ? nodeId.toString() : getTargetNodeId(sessionFactory);
 
-      try (ClientSession session = sessionFactory.createSession(false, true, true)) {
+      try (ClientSession session = sessionFactory.createSession(user, password, false, true,
true, false, 0)) {
          ClientProducer producer = session.createProducer();
 
          // perform a loop per address
@@ -307,9 +307,11 @@ public class ScaleDownHandler {
    }
 
    public void scaleDownTransactions(ClientSessionFactory sessionFactory,
-                                     ResourceManager resourceManager) throws Exception {
-      ClientSession session = sessionFactory.createSession(true, false, false);
-      ClientSession queueCreateSession = sessionFactory.createSession(false, true, true);
+                                     ResourceManager resourceManager,
+                                     String user,
+                                     String password) throws Exception {
+      ClientSession session = sessionFactory.createSession(user, password, true, false, false,
false, 0);
+      ClientSession queueCreateSession = sessionFactory.createSession(user, password, false,
true, true, false, 0);
       List<Xid> preparedTransactions = resourceManager.getPreparedTransactions();
       Map<String, Long> queueIDs = new HashMap<>();
       for (Xid xid : preparedTransactions) {
@@ -398,8 +400,10 @@ public class ScaleDownHandler {
 
    public void scaleDownDuplicateIDs(Map<SimpleString, List<Pair<byte[], Long>>>
duplicateIDMap,
                                      ClientSessionFactory sessionFactory,
-                                     SimpleString managementAddress) throws Exception {
-      ClientSession session = sessionFactory.createSession(true, false, false);
+                                     SimpleString managementAddress,
+                                     String user,
+                                     String password) throws Exception {
+      ClientSession session = sessionFactory.createSession(user, password, true, false, false,
false, 0);
       ClientProducer producer = session.createProducer(managementAddress);
       //todo - https://issues.jboss.org/browse/HORNETQ-1336
       for (SimpleString address : duplicateIDMap.keySet()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0a6d6ee/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
index d3ece00..0f8fe5c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
@@ -489,18 +489,30 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
 
    }
 
+
+
    protected void createQueue(final int node,
                               final String address,
                               final String queueName,
                               final String filterVal,
                               final boolean durable) throws Exception {
+      createQueue(node, address, queueName, filterVal, durable, null, null);
+   }
+
+   protected void createQueue(final int node,
+                              final String address,
+                              final String queueName,
+                              final String filterVal,
+                              final boolean durable,
+                              final String user,
+                              final String password) throws Exception {
       ClientSessionFactory sf = sfs[node];
 
       if (sf == null) {
          throw new IllegalArgumentException("No sf at " + node);
       }
 
-      ClientSession session = addClientSession(sf.createSession(false, true, true));
+      ClientSession session = addClientSession(sf.createSession(user, password, false, true,
true, false, 0));
 
       String filterString = null;
 
@@ -541,6 +553,16 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
                               final String queueName,
                               final String filterVal,
                               boolean autoCommitAcks) throws Exception {
+      addConsumer(consumerID, node, queueName, filterVal, autoCommitAcks, null, null);
+   }
+
+   protected void addConsumer(final int consumerID,
+                              final int node,
+                              final String queueName,
+                              final String filterVal,
+                              boolean autoCommitAcks,
+                              final String user,
+                              final String password) throws Exception {
       try {
          if (consumers[consumerID] != null) {
             throw new IllegalArgumentException("Already a consumer at " + node);
@@ -552,7 +574,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
             throw new IllegalArgumentException("No sf at " + node);
          }
 
-         ClientSession session = addClientSession(sf.createSession(false, false, autoCommitAcks));
+         ClientSession session = addClientSession(sf.createSession(user, password, false,
false, autoCommitAcks, false, 0));
 
          String filterString = null;
 
@@ -1307,6 +1329,10 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
    }
 
    protected void setupSessionFactory(final int node, final boolean netty, boolean ha) throws
Exception {
+      setupSessionFactory(node, netty, ha, null, null);
+   }
+
+   protected void setupSessionFactory(final int node, final boolean netty, boolean ha, final
String user, final String password) throws Exception {
       if (sfs[node] != null) {
          throw new IllegalArgumentException("Already a factory at " + node);
       }
@@ -1335,7 +1361,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
       addServerLocator(locators[node]);
       ClientSessionFactory sf = createSessionFactory(locators[node]);
 
-      ClientSession session = sf.createSession();
+      ClientSession session = sf.createSession(user, password, false, true, true, false,
0);
       session.close();
       sfs[node] = sf;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0a6d6ee/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
index b611a2e..49ed8a2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
@@ -46,8 +46,11 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
    public void setUp() throws Exception {
       super.setUp();
       setupLiveServer(0, isFileStorage(), false, isNetty(), true);
+      servers[0].getConfiguration().setSecurityEnabled(true);
       setupLiveServer(1, isFileStorage(), false, isNetty(), true);
+      servers[1].getConfiguration().setSecurityEnabled(true);
       setupLiveServer(2, isFileStorage(), false, isNetty(), true);
+      servers[2].getConfiguration().setSecurityEnabled(true);
       LiveOnlyPolicyConfiguration haPolicyConfiguration0 = (LiveOnlyPolicyConfiguration)
servers[0].getConfiguration().getHAPolicyConfiguration();
       ScaleDownConfiguration scaleDownConfiguration0 = new ScaleDownConfiguration();
       haPolicyConfiguration0.setScaleDownConfiguration(scaleDownConfiguration0);
@@ -65,9 +68,9 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       Assert.assertEquals(61617, servers[0].getConfiguration().getConnectorConfigurations().get(scaleDownConnector).getParams().get(TransportConstants.PORT_PROP_NAME));
       scaleDownConfiguration0.getConnectors().add(scaleDownConnector);
       startServers(0, 1, 2);
-      setupSessionFactory(0, isNetty());
-      setupSessionFactory(1, isNetty());
-      setupSessionFactory(2, isNetty());
+      setupSessionFactory(0, isNetty(), false, servers[0].getConfiguration().getClusterUser(),
servers[0].getConfiguration().getClusterPassword());
+      setupSessionFactory(1, isNetty(), false, servers[1].getConfiguration().getClusterUser(),
servers[1].getConfiguration().getClusterPassword());
+      setupSessionFactory(2, isNetty(), false, servers[2].getConfiguration().getClusterUser(),
servers[2].getConfiguration().getClusterPassword());
       IntegrationTestLogger.LOGGER.info("===============================");
       IntegrationTestLogger.LOGGER.info("Node 0: " + servers[0].getClusterManager().getNodeId());
       IntegrationTestLogger.LOGGER.info("Node 1: " + servers[1].getClusterManager().getNodeId());
@@ -109,16 +112,16 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       final String queueName1 = "testQueue1";
 
       // create a queue on each node mapped to the same address
-      createQueue(0, addressName, queueName1, null, false);
-      createQueue(1, addressName, queueName1, null, false);
-      createQueue(2, addressName, queueName1, null, false);
+      createQueue(0, addressName, queueName1, null, false, servers[0].getConfiguration().getClusterUser(),
servers[0].getConfiguration().getClusterPassword());
+      createQueue(1, addressName, queueName1, null, false, servers[1].getConfiguration().getClusterUser(),
servers[1].getConfiguration().getClusterPassword());
+      createQueue(2, addressName, queueName1, null, false, servers[2].getConfiguration().getClusterUser(),
servers[2].getConfiguration().getClusterPassword());
 
       // pause the SnF queue so that when the server tries to redistribute a message it won't
actually go across the cluster bridge
       String snfAddress = "sf.cluster0." + servers[0].getNodeID().toString();
       Queue snfQueue = ((LocalQueueBinding) servers[2].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))).getQueue();
       snfQueue.pause();
 
-      ClientSession session = sfs[2].createSession(false, true, false);
+      ClientSession session = sfs[2].createSession(servers[2].getConfiguration().getClusterUser(),
servers[2].getConfiguration().getClusterPassword(), false, true, false, false, 0);
 
       Message message;
 
@@ -152,7 +155,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       }
 
       // add a consumer to node 0 to trigger redistribution here
-      addConsumer(0, 0, queueName1, null);
+      addConsumer(0, 0, queueName1, null, true, servers[0].getConfiguration().getClusterUser(),
servers[0].getConfiguration().getClusterPassword());
 
       // allow some time for redistribution to move the message to the SnF queue
       long timeout = 10000;
@@ -194,7 +197,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       Assert.assertEquals(0, messageCount);
 
       // get the messages from queue 1 on node 1
-      addConsumer(0, 1, queueName1, null);
+      addConsumer(0, 1, queueName1, null, true, servers[1].getConfiguration().getClusterUser(),
servers[1].getConfiguration().getClusterPassword());
 
       // allow some time for redistribution to move the message to node 1
       start = System.currentTimeMillis();
@@ -246,26 +249,26 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       final String queueName3 = "testQueue3";
 
       // create a queue on each node mapped to the same address
-      createQueue(0, addressName, queueName1, null, false);
-      createQueue(1, addressName, queueName1, null, false);
-      createQueue(2, addressName, queueName1, null, false);
+      createQueue(0, addressName, queueName1, null, false, servers[0].getConfiguration().getClusterUser(),
servers[0].getConfiguration().getClusterPassword());
+      createQueue(1, addressName, queueName1, null, false, servers[1].getConfiguration().getClusterUser(),
servers[1].getConfiguration().getClusterPassword());
+      createQueue(2, addressName, queueName1, null, false, servers[2].getConfiguration().getClusterUser(),
servers[2].getConfiguration().getClusterPassword());
 
       // create a queue on each node mapped to the same address
-      createQueue(0, addressName, queueName2, null, false);
-      createQueue(1, addressName, queueName2, null, false);
-      createQueue(2, addressName, queueName2, null, false);
+      createQueue(0, addressName, queueName2, null, false, servers[0].getConfiguration().getClusterUser(),
servers[0].getConfiguration().getClusterPassword());
+      createQueue(1, addressName, queueName2, null, false, servers[1].getConfiguration().getClusterUser(),
servers[1].getConfiguration().getClusterPassword());
+      createQueue(2, addressName, queueName2, null, false, servers[2].getConfiguration().getClusterUser(),
servers[2].getConfiguration().getClusterPassword());
 
       // create a queue on each node mapped to the same address
-      createQueue(0, addressName, queueName3, null, false);
-      createQueue(1, addressName, queueName3, null, false);
-      createQueue(2, addressName, queueName3, null, false);
+      createQueue(0, addressName, queueName3, null, false, servers[0].getConfiguration().getClusterUser(),
servers[0].getConfiguration().getClusterPassword());
+      createQueue(1, addressName, queueName3, null, false, servers[1].getConfiguration().getClusterUser(),
servers[1].getConfiguration().getClusterPassword());
+      createQueue(2, addressName, queueName3, null, false, servers[2].getConfiguration().getClusterUser(),
servers[2].getConfiguration().getClusterPassword());
 
       // pause the SnF queue so that when the server tries to redistribute a message it won't
actually go across the cluster bridge
       String snfAddress = "sf.cluster0." + servers[0].getNodeID().toString();
       Queue snfQueue = ((LocalQueueBinding) servers[2].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))).getQueue();
       snfQueue.pause();
 
-      ClientSession session = sfs[2].createSession(false, true, false);
+      ClientSession session = sfs[2].createSession(servers[2].getConfiguration().getClusterUser(),
servers[2].getConfiguration().getClusterPassword(), false, true, false, false, 0);
 
       Message message;
       message = session.createMessage(false);
@@ -276,8 +279,8 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       }
 
       // add a consumer to node 0 to trigger redistribution here
-      addConsumer(0, 0, queueName1, null);
-      addConsumer(1, 0, queueName3, null);
+      addConsumer(0, 0, queueName1, null, true, servers[0].getConfiguration().getClusterUser(),
servers[0].getConfiguration().getClusterPassword());
+      addConsumer(1, 0, queueName3, null, true, servers[0].getConfiguration().getClusterUser(),
servers[0].getConfiguration().getClusterPassword());
 
       // allow some time for redistribution to move the message to the SnF queue
       long timeout = 10000;
@@ -323,8 +326,8 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       Assert.assertEquals(TEST_SIZE, getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new
SimpleString(queueName2))).getQueue()));
 
       // get the messages from queue 1 on node 1
-      addConsumer(0, 1, queueName1, null);
-      addConsumer(1, 1, queueName3, null);
+      addConsumer(0, 1, queueName1, null, true, servers[1].getConfiguration().getClusterUser(),
servers[1].getConfiguration().getClusterPassword());
+      addConsumer(1, 1, queueName3, null, true, servers[1].getConfiguration().getClusterUser(),
servers[1].getConfiguration().getClusterPassword());
 
       // allow some time for redistribution to move the message to node 1
       start = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0a6d6ee/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java
index 6a94333..ce05b8a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java
@@ -267,7 +267,7 @@ public class ScaleDownDirectTest extends ClusterTestBase {
    private long performScaledown() throws Exception {
       ScaleDownHandler handler = new ScaleDownHandler(servers[0].getPagingManager(), servers[0].getPostOffice(),
servers[0].getNodeManager(), servers[0].getClusterManager().getClusterController(), servers[0].getStorageManager());
 
-      return handler.scaleDownMessages(sfs[1], servers[1].getNodeID());
+      return handler.scaleDownMessages(sfs[1], servers[1].getNodeID(), servers[0].getConfiguration().getClusterUser(),
servers[0].getConfiguration().getClusterPassword());
    }
 
 }


Mime
View raw message