From commits-return-50361-archive-asf-public=cust-asf.ponee.io@activemq.apache.org Thu Feb 1 00:05:46 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 9B9F0180662 for ; Thu, 1 Feb 2018 00:05:46 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8B1E7160C55; Wed, 31 Jan 2018 23:05:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5C4F2160C42 for ; Thu, 1 Feb 2018 00:05:45 +0100 (CET) Received: (qmail 46298 invoked by uid 500); 31 Jan 2018 23:05:44 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 46284 invoked by uid 99); 31 Jan 2018 23:05:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 Jan 2018 23:05:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 65BD1E3AA9; Wed, 31 Jan 2018 23:05:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Wed, 31 Jan 2018 23:05:44 -0000 Message-Id: In-Reply-To: <5cabd6d6923a4535b570e70765e36f2c@git.apache.org> References: <5cabd6d6923a4535b570e70765e36f2c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] activemq-artemis git commit: ARTEMIS-1639 HornetQClientProtocolManager sending unsupported packet ARTEMIS-1639 HornetQClientProtocolManager sending unsupported packet HornetQClientProtocolManager is used to connect HornteQ servers. During reconnect, it sends a CheckFailoverMessage packet to the server as part of reconnection. This packet is not supported by HornetQ server (existing release), so it will break the backward compatibility. Also fixed a failover issue where a hornetq NettyConnector's ConnectorFactory is serialized to the clients who cannot instantiate it because class not found exception. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1693db01 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1693db01 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1693db01 Branch: refs/heads/master Commit: 1693db0177c80a87444438839396e15678e719af Parents: 7947bcf Author: Howard Gao Authored: Wed Jan 31 09:05:47 2018 +0800 Committer: Clebert Suconic Committed: Wed Jan 31 18:05:32 2018 -0500 ---------------------------------------------------------------------- .../api/core/TransportConfiguration.java | 4 + .../client/impl/ClientSessionFactoryImpl.java | 4 + .../core/remoting/ClientProtocolManager.java | 3 + .../client/HornetQClientProtocolManager.java | 17 +++ .../resources/clients/artemisHQClientHA.groovy | 37 +++++++ .../hqclientProtocolTest/failoverTest.groovy | 85 +++++++++++++++ .../resources/servers/node/hornetqServer.groovy | 89 ++++++++++++++++ .../servers/node/hornetqServer_backup.groovy | 91 ++++++++++++++++ .../compatibility/FailoverServerBaseTest.java | 45 ++++++++ .../compatibility/HQClientProtocolTest.java | 84 +++++++++++++++ .../tests/compatibility/VersionedBaseTest.java | 31 ++++++ .../compat/HQClientProtocolManagerTest.java | 104 +++++++++++++++++++ 12 files changed, 594 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1693db01/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java index 71679b5..7fcfcd5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java @@ -401,4 +401,8 @@ public class TransportConfiguration implements Serializable { private static String replaceWildcardChars(final String str) { return str.replace('.', '-'); } + + public void setFactoryClassName(String factoryClassName) { + this.factoryClassName = factoryClassName; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1693db01/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index e8ac8f8..e2bfd0f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -256,6 +256,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C public void setBackupConnector(final TransportConfiguration live, final TransportConfiguration backUp) { Connector localConnector = connector; + if (backUp != null) { + this.clientProtocolManager.updateTransportConfiguration(backUp); + } + // if the connector has never been used (i.e. the getConnection hasn't been called yet), we will need // to create a connector just to validate if the parameters are ok. // so this will create the instance to be used on the isEquivalent check http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1693db01/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java index e2c9fc1..9788e1e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java @@ -22,6 +22,7 @@ import java.util.concurrent.locks.Lock; import io.netty.channel.ChannelPipeline; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -78,4 +79,6 @@ public interface ClientProtocolManager { String getName(); + default void updateTransportConfiguration(TransportConfiguration backUp) { + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1693db01/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java index 6a998ef..f0010ea 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java @@ -17,12 +17,15 @@ package org.apache.activemq.artemis.core.protocol.hornetq.client; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.version.Version; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; @@ -64,4 +67,18 @@ public class HornetQClientProtocolManager extends ActiveMQClientProtocolManager getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer, VERSION_PLAYED)); } + @Override + public boolean checkForFailover(String liveNodeID) throws ActiveMQException { + //HornetQ doesn't support CheckFailoverMessage packet + return true; + } + + @Override + public void updateTransportConfiguration(TransportConfiguration connector) { + String factoryClassName = connector.getFactoryClassName(); + if ("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory".equals(factoryClassName)) { + connector.setFactoryClassName(NettyConnectorFactory.class.getName()); + } + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1693db01/tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy ---------------------------------------------------------------------- diff --git a/tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy b/tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy new file mode 100644 index 0000000..c179a1b --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/clients/artemisHQClientHA.groovy @@ -0,0 +1,37 @@ +package clients +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Create a client connection factory + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory +import org.apache.activemq.artemis.tests.compatibility.GroovyRun; + +println("serverType " + serverArg[0]); + +if (serverArg[0].startsWith("HORNETQ")) { + cf = new ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&reconnectAttempts=10&protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false"); +} else { + cf = new ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&reconnectAttempts=10&confirmationWindowSize=1048576&blockOnDurableSend=false"); +} + + +GroovyRun.assertTrue(!cf.getServerLocator().isBlockOnDurableSend()); +GroovyRun.assertEquals(1048576, cf.getServerLocator().getConfirmationWindowSize()); +GroovyRun.assertTrue(cf.getServerLocator().isHA()); +GroovyRun.assertEquals(10, cf.getServerLocator().getReconnectAttempts()); + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1693db01/tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy ---------------------------------------------------------------------- diff --git a/tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy b/tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy new file mode 100644 index 0000000..0dedb96 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/hqclientProtocolTest/failoverTest.groovy @@ -0,0 +1,85 @@ +package meshTest + +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +import javax.jms.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// starts an artemis server +String serverType = arg[0]; +String clientType = arg[1]; +String operation = arg[2]; + + +String queueName = "queue"; + + +String textBody = "a rapadura e doce mas nao e mole nao"; + +println("clientType " + clientType); +println("serverType " + serverType); + +if (clientType.startsWith("ARTEMIS")) { + // Can't depend directly on artemis, otherwise it wouldn't compile in hornetq + GroovyRun.evaluate("clients/artemisHQClientHA.groovy", "serverArg", serverType); +} else { + throw new RuntimeException("The test is not meant for client type: " + clientType); +} + + +Connection connection = cf.createConnection(); +Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); +Queue queue = session.createQueue(queueName); + +if (operation.equals("failoverTestSend")) { + + CountDownLatch latch = new CountDownLatch(10); + + CompletionListener completionListener = new CompletionListener() { + @Override + void onCompletion(Message message) { + latch.countDown(); + } + + @Override + void onException(Message message, Exception exception) { + + } + } + + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage(textBody + i), completionListener); + } + + GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS)); + + System.out.println("Message sent"); + + return connection; +} else { + throw new RuntimeException("Invalid operation " + operation); +} + + + + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1693db01/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy ---------------------------------------------------------------------- diff --git a/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy b/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy new file mode 100644 index 0000000..6460850 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer.groovy @@ -0,0 +1,89 @@ +package servers + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// starts a clustered live hornetq server +import org.hornetq.api.core.TransportConfiguration +import org.hornetq.api.core.BroadcastGroupConfiguration +import org.hornetq.api.core.UDPBroadcastGroupConfiguration +import org.hornetq.api.core.DiscoveryGroupConfiguration +import org.hornetq.core.config.impl.ConfigurationImpl +import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory +import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory +import org.hornetq.core.remoting.impl.netty.TransportConstants +import org.hornetq.jms.server.config.impl.JMSConfigurationImpl +import org.hornetq.jms.server.config.impl.* +import org.hornetq.jms.server.embedded.EmbeddedJMS +import org.hornetq.core.config.ClusterConnectionConfiguration + +String folder = arg[0]; +String id = arg[1]; +String offset = arg[2]; + +configuration = new ConfigurationImpl(); +configuration.setSecurityEnabled(false); +configuration.setJournalDirectory(folder + "/" + id + "/journal"); +configuration.setBindingsDirectory(folder + "/" + id + "/binding"); +configuration.setPagingDirectory(folder + "/" + id + "/paging"); +configuration.setLargeMessagesDirectory(folder + "/" + id + "/largemessage"); +configuration.setJournalType(org.hornetq.core.server.JournalType.NIO); +configuration.setPersistenceEnabled(true); +configuration.setFailoverOnServerShutdown(true); + +HashMap map = new HashMap(); +map.put(TransportConstants.HOST_PROP_NAME, "localhost"); +map.put(TransportConstants.PORT_PROP_NAME, (61616 + Integer.parseInt(offset))); +TransportConfiguration tpc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), map); +configuration.getAcceptorConfigurations().add(tpc); + +TransportConfiguration connectorConfig = new TransportConfiguration(NettyConnectorFactory.class.getName(), map, "netty"); +configuration.getConnectorConfigurations().put("netty", connectorConfig); + +ClusterConnectionConfiguration cc = new ClusterConnectionConfiguration("test-cluster", "jms", "netty", 200, + true, + true, + 1, + 1024, + "dg"); +configuration.getClusterConfigurations().add(cc); + +UDPBroadcastGroupConfiguration endpoint = new UDPBroadcastGroupConfiguration("231.7.7.7", 9876, null, -1); +List connectors = new ArrayList<>(); +connectors.add("netty"); +BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg", 250, + connectors, + endpoint); + +configuration.getBroadcastGroupConfigurations().add(bcConfig); + +DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg", 5000, 5000, endpoint); +configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig); + +jmsConfiguration = new JMSConfigurationImpl(); + +JMSQueueConfigurationImpl queueConfiguration = new JMSQueueConfigurationImpl("queue", null, true); +TopicConfigurationImpl topicConfiguration = new TopicConfigurationImpl("topic"); + + +jmsConfiguration.getQueueConfigurations().add(queueConfiguration); +jmsConfiguration.getTopicConfigurations().add(topicConfiguration); +server = new EmbeddedJMS(); +server.setConfiguration(configuration); +server.setJmsConfiguration(jmsConfiguration); +server.start(); + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1693db01/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy ---------------------------------------------------------------------- diff --git a/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy b/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy new file mode 100644 index 0000000..f75f185 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/servers/node/hornetqServer_backup.groovy @@ -0,0 +1,91 @@ +package servers + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// starts a clustered backup hornetq server +import org.hornetq.api.core.TransportConfiguration +import org.hornetq.api.core.BroadcastGroupConfiguration +import org.hornetq.api.core.UDPBroadcastGroupConfiguration +import org.hornetq.api.core.DiscoveryGroupConfiguration +import org.hornetq.core.config.impl.ConfigurationImpl +import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory +import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory +import org.hornetq.core.remoting.impl.netty.TransportConstants +import org.hornetq.jms.server.config.impl.JMSConfigurationImpl +import org.hornetq.jms.server.config.impl.* +import org.hornetq.jms.server.embedded.EmbeddedJMS +import org.hornetq.core.config.ClusterConnectionConfiguration + +String folder = arg[0]; +String id = arg[1]; +String offset = arg[2]; + +configuration = new ConfigurationImpl(); +configuration.setSecurityEnabled(false); +configuration.setJournalDirectory(folder + "/" + id + "/journal"); +configuration.setBindingsDirectory(folder + "/" + id + "/binding"); +configuration.setPagingDirectory(folder + "/" + id + "/paging"); +configuration.setLargeMessagesDirectory(folder + "/" + id + "/largemessage"); +configuration.setJournalType(org.hornetq.core.server.JournalType.NIO); +configuration.setPersistenceEnabled(true); + +configuration.setFailoverOnServerShutdown(true); +configuration.setBackup(true); +configuration.setSharedStore(true); + +HashMap map = new HashMap(); +map.put(TransportConstants.HOST_PROP_NAME, "localhost"); +map.put(TransportConstants.PORT_PROP_NAME, (61616 + Integer.parseInt(offset))); +TransportConfiguration tpc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), map); +configuration.getAcceptorConfigurations().add(tpc); + +TransportConfiguration connectorConfig = new TransportConfiguration(NettyConnectorFactory.class.getName(), map, "netty"); +configuration.getConnectorConfigurations().put("netty", connectorConfig); + +ClusterConnectionConfiguration cc = new ClusterConnectionConfiguration("test-cluster", "jms", "netty", 200, + true, + true, + 1, + 1024, + "dg"); +configuration.getClusterConfigurations().add(cc); + +UDPBroadcastGroupConfiguration endpoint = new UDPBroadcastGroupConfiguration("231.7.7.7", 9876, null, -1); +List connectors = new ArrayList<>(); +connectors.add("netty"); +BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg", 250, + connectors, + endpoint); + +configuration.getBroadcastGroupConfigurations().add(bcConfig); + +DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg", 5000, 5000, endpoint); +configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig); + +jmsConfiguration = new JMSConfigurationImpl(); + +JMSQueueConfigurationImpl queueConfiguration = new JMSQueueConfigurationImpl("queue", null, true); +TopicConfigurationImpl topicConfiguration = new TopicConfigurationImpl("topic"); + + +jmsConfiguration.getQueueConfigurations().add(queueConfiguration); +jmsConfiguration.getTopicConfigurations().add(topicConfiguration); +backupServer = new EmbeddedJMS(); +backupServer.setConfiguration(configuration); +backupServer.setJmsConfiguration(jmsConfiguration); +backupServer.start(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1693db01/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java ---------------------------------------------------------------------- diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java new file mode 100644 index 0000000..55e0331 --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/FailoverServerBaseTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.compatibility; + +import org.apache.activemq.artemis.utils.FileUtil; +import org.junit.After; +import org.junit.Before; + +public class FailoverServerBaseTest extends VersionedBaseTest { + + protected boolean stopServers = true; + + public FailoverServerBaseTest(String server, String sender, String receiver) throws Exception { + super(server, sender, receiver); + } + + @Before + public void setUp() throws Throwable { + FileUtil.deleteDirectory(serverFolder.getRoot()); + setVariable(serverClassloader, "persistent", Boolean.FALSE); + startServerWithBackup(serverFolder.getRoot(), serverClassloader, "live"); + } + + @After + public void tearDown() throws Throwable { + if (stopServers) { + stopServerWithBackup(serverClassloader); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1693db01/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQClientProtocolTest.java ---------------------------------------------------------------------- diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQClientProtocolTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQClientProtocolTest.java new file mode 100644 index 0000000..1e7e8de --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQClientProtocolTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.compatibility; + +import org.apache.activemq.artemis.api.core.client.FailoverEventType; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.HORNETQ_235; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class HQClientProtocolTest extends FailoverServerBaseTest { + + @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}") + public static Collection getParameters() { + List combinations = new ArrayList<>(); + + combinations.add(new Object[]{HORNETQ_235, SNAPSHOT, SNAPSHOT}); + return combinations; + } + + public HQClientProtocolTest(String server, String sender, String receiver) throws Exception { + super(server, sender, receiver); + } + + @Test + public void FailoverTest() throws Throwable { + ActiveMQConnection conn = (ActiveMQConnection) evaluate(senderClassloader, "hqclientProtocolTest/failoverTest.groovy", server, sender, "failoverTestSend"); + CountDownLatch latch = new CountDownLatch(1); + conn.setFailoverListener(eventType -> { + if (eventType == FailoverEventType.FAILOVER_COMPLETED) { + latch.countDown(); + } + }); + + stopServer(serverClassloader); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + conn.start(); + Queue queue = session.createQueue("queue"); + MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0; i < 10; i++) { + Message msg = consumer.receive(5000); + assertNotNull(msg); + } + assertNull(consumer.receiveNoWait()); + + stopBackup(serverClassloader); + stopServers = false; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1693db01/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java ---------------------------------------------------------------------- diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java index 8dc3302..8f80591 100644 --- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java @@ -203,7 +203,38 @@ public abstract class VersionedBaseTest { evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver); } + public void startServerWithBackup(File folder, ClassLoader loader, String serverName) throws Throwable { + folder.mkdirs(); + + System.out.println("Folder::" + folder); + + String liveScript; + String backupScript; + String topologyScript; + + if (server.startsWith("ARTEMIS")) { + liveScript = "servers/node/artemisServer.groovy"; + backupScript = "servers/node/artemisServer_backup.groovy"; + topologyScript = null; + } else { + liveScript = "servers/node/hornetqServer.groovy"; + backupScript = "servers/node/hornetqServer_backup.groovy"; + } + + evaluate(loader, liveScript, folder.getAbsolutePath(), serverName, "0"); + evaluate(loader, backupScript, folder.getAbsolutePath(), serverName, "1"); + } + + public void stopServerWithBackup(ClassLoader loader) throws Throwable { + execute(loader, "backupServer.stop()"); + execute(loader, "server.stop()"); + } + public void stopServer(ClassLoader loader) throws Throwable { execute(loader, "server.stop()"); } + + public void stopBackup(ClassLoader loader) throws Throwable { + execute(loader, "backupServer.stop()"); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1693db01/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/compat/HQClientProtocolManagerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/compat/HQClientProtocolManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/compat/HQClientProtocolManagerTest.java new file mode 100644 index 0000000..7bb070a --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/compat/HQClientProtocolManagerTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.remoting.compat; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Test; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class HQClientProtocolManagerTest extends ActiveMQTestBase { + + @Test + public void testNoCheckFailoverMessage() throws Exception { + final int pingPeriod = 1000; + + ActiveMQServer server = createServer(false, true); + + server.start(); + + ClientSessionInternal session = null; + + try { + ServerLocator locator = createFactory(true).setClientFailureCheckPeriod(pingPeriod).setRetryInterval(500).setRetryIntervalMultiplier(1d).setReconnectAttempts(-1).setConfirmationWindowSize(1024 * 1024); + locator.setProtocolManagerFactory(new HornetQClientProtocolManagerFactory()); + ClientSessionFactory factory = createSessionFactory(locator); + + session = (ClientSessionInternal) factory.createSession(); + + server.stop(); + + Thread.sleep((pingPeriod * 2)); + + List incomings = server.getConfiguration().getIncomingInterceptorClassNames(); + incomings.add(UnsupportedPacketInterceptor.class.getName()); + + server.start(); + + //issue a query to make sure session is reconnected. + ClientSession.QueueQuery query = session.queueQuery(new SimpleString("anyvalue")); + assertFalse(query.isExists()); + + locator.close(); + + UnsupportedPacketInterceptor.checkReceivedTypes(); + } finally { + try { + session.close(); + } catch (Throwable e) { + } + + server.stop(); + } + } + + + public static class UnsupportedPacketInterceptor implements Interceptor { + + private static Set receivedTypes = new HashSet<>(); + private static Set unsupportedTypes = new HashSet<>(); + static { + unsupportedTypes.add(PacketImpl.CHECK_FOR_FAILOVER); + } + + @Override + public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { + receivedTypes.add(packet.getType()); + return true; + } + + public static void checkReceivedTypes() throws Exception { + for (Byte type : receivedTypes) { + assertFalse("Received unsupported type: " + type, unsupportedTypes.contains(type)); + } + } + } +}