Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BE79618463 for ; Thu, 18 Feb 2016 16:19:38 +0000 (UTC) Received: (qmail 15215 invoked by uid 500); 18 Feb 2016 16:19:37 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 15037 invoked by uid 500); 18 Feb 2016 16:19:37 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 13263 invoked by uid 99); 18 Feb 2016 16:19:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Feb 2016 16:19:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6B99FDFF67; Thu, 18 Feb 2016 16:19:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Thu, 18 Feb 2016 16:20:15 -0000 Message-Id: In-Reply-To: <35148000a30e4598a53a04753d684ec9@git.apache.org> References: <35148000a30e4598a53a04753d684ec9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [41/43] activemq-artemis git commit: Added openwire parameters as bean properties so that it can be passed via the new protocol manager bean util. Added openwire parameters as bean properties so that it can be passed via the new protocol manager bean util. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2bbcf07a Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2bbcf07a Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2bbcf07a Branch: refs/heads/refactor-openwire Commit: 2bbcf07a02da087840f7988ff57ef9fc7cdb7460 Parents: 3bc869e Author: Howard Gao Authored: Wed Feb 17 20:50:33 2016 +0800 Committer: Clebert Suconic Committed: Thu Feb 18 11:18:11 2016 -0500 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 25 ++----------- .../openwire/OpenWireProtocolManager.java | 38 +++++++++++++++++--- .../artemiswrapper/OpenwireArtemisBaseTest.java | 20 ++++++++--- .../transport/failover/FailoverClusterTest.java | 11 ++++-- .../failover/FailoverComplexClusterTest.java | 16 ++++++--- .../failover/FailoverPriorityTest.java | 22 ++++++++---- .../failover/FailoverUpdateURIsTest.java | 6 +++- 7 files changed, 94 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bbcf07a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 61e93cb..a6f0f34 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -55,7 +55,6 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ConcurrentHashSet; -import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; @@ -153,10 +152,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S private String defaultSocketURIString; - private boolean rebalance; - private boolean updateClusterClients; - private boolean updateClusterClientsOnRemove; - public OpenWireConnection(Acceptor acceptorUsed, Connection connection, OpenWireProtocolManager openWireProtocolManager, @@ -167,12 +162,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S this.wireFormat = wf; this.creationTime = System.currentTimeMillis(); this.defaultSocketURIString = connection.getLocalAddress(); - - //Clebert: These are parameters specific to openwire cluster with defaults as specified at - //http://activemq.apache.org/failover-transport-reference.html - rebalance = ConfigurationHelper.getBooleanProperty("rebalance-cluster-clients", true, acceptorUsed.getConfiguration()); - updateClusterClients = ConfigurationHelper.getBooleanProperty("update-cluster-clients", true, acceptorUsed.getConfiguration()); - updateClusterClientsOnRemove = ConfigurationHelper.getBooleanProperty("update-cluster-clients-on-remove", true, acceptorUsed.getConfiguration()); } @Override @@ -200,10 +189,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S return info.getPassword(); } - public boolean isRebalance() { - return rebalance; - } - private ConnectionInfo getConnectionInfo() { if (state == null) { return null; @@ -539,9 +524,9 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S Response resp = new ExceptionResponse(e); return resp; } - if (info.isManageable() && this.isUpdateClusterClients()) { + if (info.isManageable() && protocolManager.isUpdateClusterClients()) { // send ConnectionCommand - ConnectionControl command = protocolManager.newConnectionControl(rebalance); + ConnectionControl command = protocolManager.newConnectionControl(); command.setFaultTolerant(protocolManager.isFaultTolerantConfiguration()); if (info.isFailoverReconnect()) { command.setRebalanceConnection(false); @@ -1274,16 +1259,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S public void updateClient(ConnectionControl control) { // if (!destroyed && context.isFaultTolerant()) { - if (updateClusterClients) { + if (protocolManager.isUpdateClusterClients()) { dispatchAsync(control); } // } } - public boolean isUpdateClusterClients() { - return updateClusterClients; - } - public AMQConnectionContext initContext(ConnectionInfo info) { WireFormatInfo wireFormatInfo = wireFormat.getPreferedWireFormatInfo(); // Older clients should have been defaulting this field to true.. but http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bbcf07a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 525844c..bd26b07 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -148,6 +148,12 @@ public class OpenWireProtocolManager implements ProtocolManager, No private final ScheduledExecutorService scheduledPool; + //bean properties + //http://activemq.apache.org/failover-transport-reference.html + private boolean rebalanceClusterClients = false; + private boolean updateClusterClients = false; + private boolean updateClusterClientsOnRemove = false; + public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) { this.factory = factory; this.server = server; @@ -189,7 +195,7 @@ public class OpenWireProtocolManager implements ProtocolManager, No } for (OpenWireConnection c : this.connections) { - ConnectionControl control = newConnectionControl(c.isRebalance()); + ConnectionControl control = newConnectionControl(); c.updateClient(control); } } @@ -422,13 +428,13 @@ public class OpenWireProtocolManager implements ProtocolManager, No return brokerName; } - protected ConnectionControl newConnectionControl(boolean rebalance) { + protected ConnectionControl newConnectionControl() { ConnectionControl control = new ConnectionControl(); - String uri = generateMembersURI(rebalance); + String uri = generateMembersURI(rebalanceClusterClients); control.setConnectedBrokers(uri); - control.setRebalanceConnection(rebalance); + control.setRebalanceConnection(rebalanceClusterClients); return control; } @@ -814,4 +820,28 @@ public class OpenWireProtocolManager implements ProtocolManager, No brokerInfo.setPeerBrokerInfos(null); connection.dispatchAsync(brokerInfo); } + + public void setRebalanceClusterClients(boolean rebalance) { + this.rebalanceClusterClients = rebalance; + } + + public boolean isRebalanceClusterClients() { + return this.rebalanceClusterClients; + } + + public void setUpdateClusterClients(boolean updateClusterClients) { + this.updateClusterClients = updateClusterClients; + } + + public boolean isUpdateClusterClients() { + return this.updateClusterClients; + } + + public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) { + this.updateClusterClientsOnRemove = updateClusterClientsOnRemove; + } + + public boolean isUpdateClusterClientsOnRemove() { + return this.updateClusterClientsOnRemove; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bbcf07a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java index be9cf06..5c8d3b6 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java @@ -19,7 +19,9 @@ package org.apache.activemq.broker.artemiswrapper; import java.io.File; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; @@ -31,6 +33,8 @@ import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.artemis.utils.uri.URISchema; +import org.apache.activemq.artemis.utils.uri.URISupport; import org.apache.activemq.broker.BrokerService; import org.junit.Assert; import org.junit.Rule; @@ -88,7 +92,7 @@ public class OpenwireArtemisBaseTest { public String CLUSTER_PASSWORD = "OPENWIRECLUSTER"; protected Configuration createConfig(final int serverID) throws Exception { - return createConfig("localhost", serverID); + return createConfig("localhost", serverID, Collections.EMPTY_MAP); } protected Configuration createConfig(final String hostAddress, final int serverID, final int port) throws Exception { @@ -111,6 +115,10 @@ public class OpenwireArtemisBaseTest { } protected Configuration createConfig(final String hostAddress, final int serverID) throws Exception { + return createConfig(hostAddress, serverID, Collections.EMPTY_MAP); + } + + protected Configuration createConfig(final String hostAddress, final int serverID, Map params) throws Exception { ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false). setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(1000 * 1024).setJournalType(JournalType.NIO). setJournalDirectory(getJournalDir(serverID, false)). @@ -123,7 +131,7 @@ public class OpenwireArtemisBaseTest { configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true)); - configuration.addAcceptorConfiguration("netty", newURI(hostAddress, serverID)); + configuration.addAcceptorConfiguration("netty", newURI(hostAddress, serverID) + "?" + URISupport.createQueryString(params)); configuration.addConnectorConfiguration("netty-connector", newURI(hostAddress, serverID)); return configuration; @@ -171,8 +179,12 @@ public class OpenwireArtemisBaseTest { return "tcp://" + localhostAddress + ":" + (61616 + serverID); } - protected static String newURIwithPort(String localhostAddress, int port) { - return "tcp://" + localhostAddress + ":" + port; + protected static String newURIwithPort(String localhostAddress, int port) throws Exception { + return newURIwithPort(localhostAddress, port, Collections.EMPTY_MAP); + } + + protected static String newURIwithPort(String localhostAddress, int port, Map params) throws Exception { + return "tcp://" + localhostAddress + ":" + port + "?" + URISupport.createQueryString(params); } public static JMSServerControl createJMSServerControl(final MBeanServer mbeanServer) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bbcf07a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java index bf43caa..74fa6aa 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java @@ -22,8 +22,10 @@ import javax.jms.Queue; import javax.jms.Session; import java.net.URI; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -50,8 +52,13 @@ public class FailoverClusterTest extends OpenwireArtemisBaseTest { @Before public void setUp() throws Exception { - Configuration config1 = createConfig(1); - Configuration config2 = createConfig(2); + Map params = new HashMap(); + + params.put("rebalanceClusterClients", "true"); + params.put("updateClusterClients", "true"); + + Configuration config1 = createConfig("localhost", 1, params); + Configuration config2 = createConfig("localhost", 2, params); deployClusterConfiguration(config1, 2); deployClusterConfiguration(config2, 1); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bbcf07a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java index 1d902e3..fd9ce1f 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java @@ -65,9 +65,15 @@ public class FailoverComplexClusterTest extends OpenwireArtemisBaseTest { //default setup for most tests private void commonSetup() throws Exception { - Configuration config0 = createConfig(0); - Configuration config1 = createConfig(1); - Configuration config2 = createConfig(2); + Map params = new HashMap(); + + params.put("rebalanceClusterClients", "true"); + params.put("updateClusterClients", "true"); + params.put("updateClusterClientsOnRemove", "true"); + + Configuration config0 = createConfig("localhost", 0, params); + Configuration config1 = createConfig("localhost", 1, params); + Configuration config2 = createConfig("localhost", 2, params); deployClusterConfiguration(config0, 1, 2); deployClusterConfiguration(config1, 0, 2); @@ -248,9 +254,9 @@ public class FailoverComplexClusterTest extends OpenwireArtemisBaseTest { @Test public void testFailOverWithUpdateClientsOnRemove() throws Exception { // Broker A - Configuration config0 = createConfig(0, "?rebalance-cluster-client=true&update-cluster-clients=true&update-cluster-clients-on-remove=true"); + Configuration config0 = createConfig(0, "?rebalanceClusterClients=true&updateClusterClients=true&updateClusterClientsOnRemove=true"); // Broker B - Configuration config1 = createConfig(1, "?rebalance-cluster-client=true&update-cluster-clients=true&update-cluster-clients-on-remove=true"); + Configuration config1 = createConfig(1, "?rebalanceClusterClients=true&updateClusterClients=true&updateClusterClientsOnRemove=true"); deployClusterConfiguration(config0, 1); deployClusterConfiguration(config1, 0); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bbcf07a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java index 6e559e7..6f4b27e 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.transport.failover; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.activemq.ActiveMQConnection; @@ -49,11 +50,16 @@ public class FailoverPriorityTest extends OpenwireArtemisBaseTest { private final List connections = new ArrayList(); private EmbeddedJMS[] servers = new EmbeddedJMS[3]; private String clientUrl; + private Map params = new HashMap(); @Before public void setUp() throws Exception { urls.put(0, BROKER_A_CLIENT_TC_ADDRESS); urls.put(1, BROKER_B_CLIENT_TC_ADDRESS); + params.clear(); + params.put("rebalanceClusterClients", "true"); + params.put("updateClusterClients", "true"); + params.put("updateClusterClientsOnRemove", "true"); } @After @@ -136,7 +142,7 @@ public class FailoverPriorityTest extends OpenwireArtemisBaseTest { @Test public void testThreeBrokers() throws Exception { - commonSetup(); + setupThreeBrokers(); Thread.sleep(1000); setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + "," + BROKER_C_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false"); @@ -262,11 +268,15 @@ public class FailoverPriorityTest extends OpenwireArtemisBaseTest { } } - //default setup for most tests - private void commonSetup() throws Exception { - Configuration config0 = createConfig("127.0.0.1", 0); - Configuration config1 = createConfig("127.0.0.1", 1); - Configuration config2 = createConfig("127.0.0.1", 2); + private void setupThreeBrokers() throws Exception { + + params.put("rebalanceClusterClients", "false"); + params.put("updateClusterClients", "false"); + params.put("updateClusterClientsOnRemove", "false"); + + Configuration config0 = createConfig("127.0.0.1", 0, params); + Configuration config1 = createConfig("127.0.0.1", 1, params); + Configuration config2 = createConfig("127.0.0.1", 2, params); deployClusterConfiguration(config0, 1, 2); deployClusterConfiguration(config1, 0, 2); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bbcf07a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java index 002a788..0a127dd 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java @@ -18,6 +18,8 @@ package org.apache.activemq.transport.failover; import java.io.File; import java.io.FileOutputStream; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; import javax.jms.Connection; @@ -117,7 +119,9 @@ public class FailoverUpdateURIsTest extends OpenwireArtemisBaseTest { @Test public void testAutoUpdateURIs() throws Exception { - Configuration config0 = createConfig(0); + Map params = new HashMap(); + params.put("updateClusterClients", "true"); + Configuration config0 = createConfig("localhost", 0, params); deployClusterConfiguration(config0, 10); server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); server0.start();