Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F1DDD1896D for ; Wed, 3 Feb 2016 01:25:51 +0000 (UTC) Received: (qmail 80510 invoked by uid 500); 3 Feb 2016 01:25:51 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 80414 invoked by uid 500); 3 Feb 2016 01:25:51 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 78677 invoked by uid 99); 3 Feb 2016 01:25:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Feb 2016 01:25:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6BC3DE0942; Wed, 3 Feb 2016 01:25:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Wed, 03 Feb 2016 01:26:18 -0000 Message-Id: In-Reply-To: <87d3343869e64c8ba025051dd2c6a3b1@git.apache.org> References: <87d3343869e64c8ba025051dd2c6a3b1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [30/31] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d4152808/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index ca1127d..5dacbe9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -21,7 +21,9 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.io.PrintWriter; import java.io.Serializable; +import java.io.StringWriter; import java.lang.reflect.Array; import java.net.URI; import java.security.AccessController; @@ -1299,6 +1301,8 @@ public class ConfigurationImpl implements Configuration, Serializable { public TransportConfiguration[] getTransportConfigurations(final List connectorNames) { TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, connectorNames.size()); int count = 0; + System.out.println(debugConnectors()); + for (String connectorName : connectorNames) { TransportConfiguration connector = getConnectorConfigurations().get(connectorName); @@ -1314,6 +1318,21 @@ public class ConfigurationImpl implements Configuration, Serializable { return tcConfigs; } + public String debugConnectors() { + StringWriter stringWriter = new StringWriter(); + PrintWriter writer = new PrintWriter(stringWriter); + + + for (Map.Entry connector : getConnectorConfigurations().entrySet()) { + writer.println("Connector::" + connector.getKey() + " value = " + connector.getValue()); + } + + writer.close(); + + return stringWriter.toString(); + + } + @Override public boolean isResolveProtocols() { return resolveProtocols; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d4152808/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index 345981e..38f3544 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -169,6 +169,8 @@ public class NettyAcceptor implements Acceptor { private final long connectionsAllowed; + private Map extraConfigs; + public NettyAcceptor(final String name, final ClusterConnection clusterConnection, final Map configuration, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d4152808/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 4b83657..50f0cab 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -248,7 +248,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle ClusterConnection clusterConnection = lookupClusterConnection(info); - Acceptor acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, supportedProtocols.isEmpty() ? protocolMap : supportedProtocols); + Acceptor acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getAllParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, supportedProtocols.isEmpty() ? protocolMap : supportedProtocols); if (defaultInvmSecurityPrincipal != null && acceptor.isUnsecurable()) { acceptor.setDefaultActiveMQPrincipal(defaultInvmSecurityPrincipal); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d4152808/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java index ef384e0..e3a583f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java @@ -69,6 +69,11 @@ public class EmbeddedActiveMQ { * @return */ public boolean waitClusterForming(long timeWait, TimeUnit unit, int iterations, int servers) throws Exception { + if (activeMQServer.getClusterManager().getClusterConnections() == null || + activeMQServer.getClusterManager().getClusterConnections().size() == 0) { + return servers == 0; + } + for (int i = 0; i < iterations; i++) { for (ClusterConnection connection : activeMQServer.getClusterManager().getClusterConnections()) { if (connection.getTopology().getMembers().size() == servers) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d4152808/artemis-server/src/test/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationTest.java index 771d89c..3df2ecf 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationTest.java @@ -41,7 +41,6 @@ public class ClusterConnectionConfigurationTest { ClusterConnectionConfigurationParser parser = new ClusterConnectionConfigurationParser(); ClusterConnectionConfiguration configuration = parser.newObject(new URI("static://(tcp://localhost:6556,tcp://localhost:6557)?minLargeMessageSize=132;messageLoadBalancingType=OFF"), null); Assert.assertEquals(132, configuration.getMinLargeMessageSize()); - Assert.assertEquals(MessageLoadBalancingType.OFF, configuration.getMessageLoadBalancingType()); Assert.assertEquals(2, configuration.getCompositeMembers().getComponents().length); Assert.assertEquals("tcp://localhost:6556", configuration.getCompositeMembers().getComponents()[0].toString()); Assert.assertEquals("tcp://localhost:6557", configuration.getCompositeMembers().getComponents()[1].toString()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d4152808/tests/activemq5-unit-tests/pom.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/pom.xml b/tests/activemq5-unit-tests/pom.xml index 5b77b7e..e52b9a6 100644 --- a/tests/activemq5-unit-tests/pom.xml +++ b/tests/activemq5-unit-tests/pom.xml @@ -29,7 +29,7 @@ ${project.basedir}/../.. - 5.11.1 + 5.12.0 3.4.1 1.0.6 2.5.1 @@ -43,6 +43,7 @@ 1.9.2 2.0.0-M6 3.1.4 + 2.2.0 @@ -62,13 +63,6 @@ org.apache.activemq - activemq-broker - ${activemq5.project.version} - test-jar - - - - org.apache.activemq activemq-jdbc-store ${activemq5.project.version} @@ -313,6 +307,35 @@ artemis-openwire-protocol ${project.version} + + org.jboss.byteman + byteman + ${byteman.version} + + + org.jboss.byteman + byteman-submit + test + ${byteman.version} + + + org.jboss.byteman + byteman-install + test + ${byteman.version} + + + org.jboss.byteman + byteman-bmunit + test + ${byteman.version} + + + org.testng + testng + + + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d4152808/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/ArtemisBrokerWrapperFactory.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/ArtemisBrokerWrapperFactory.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/ArtemisBrokerWrapperFactory.java new file mode 100644 index 0000000..eff9ab3 --- /dev/null +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/ArtemisBrokerWrapperFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker; + +import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper; + +import java.util.ArrayList; +import java.util.List; + +public class ArtemisBrokerWrapperFactory { + List brokers = new ArrayList<>(); + + public static Broker createBroker(BrokerService brokerService) { + + return null; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d4152808/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java index ffdfc6e..d34f943 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -16,22 +16,25 @@ */ package org.apache.activemq.broker; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.net.ServerSocket; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.WeakHashMap; - -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.ActiveMQConnectionMetaData; import org.apache.activemq.Service; @@ -44,6 +47,8 @@ import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; +import org.apache.activemq.network.ConnectionFilter; +import org.apache.activemq.network.DiscoveryNetworkConnector; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.jms.JmsConnector; import org.apache.activemq.proxy.ProxyConnector; @@ -57,6 +62,7 @@ import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.IOExceptionHandler; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.ServiceStopper; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,10 +74,12 @@ import org.slf4j.LoggerFactory; public class BrokerService implements Service { public static final String DEFAULT_PORT = "61616"; + public static final AtomicInteger RANDOM_PORT_BASE = new AtomicInteger(51616); public static final String DEFAULT_BROKER_NAME = "localhost"; public static final String BROKER_VERSION; public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; public static final long DEFAULT_START_TIMEOUT = 600000L; + public static boolean disableWrapper = false; public String SERVER_SIDE_KEYSTORE; public String KEYSTORE_PASSWORD; @@ -99,6 +107,11 @@ public class BrokerService implements Service { private PolicyMap destinationPolicy; private SystemUsage systemUsage; + private boolean isClustered = true; + private final List networkConnectors = new CopyOnWriteArrayList(); + + private TemporaryFolder tmpfolder; + public static WeakHashMap map = new WeakHashMap<>(); static { @@ -131,6 +144,10 @@ public class BrokerService implements Service { @Override public void start() throws Exception { + File targetTmp = new File("./target/tmp"); + targetTmp.mkdirs(); + tmpfolder = new TemporaryFolder(targetTmp); + tmpfolder.create(); Exception e = new Exception(); e.fillInStackTrace(); startBroker(startAsync); @@ -188,10 +205,10 @@ public class BrokerService implements Service { LOG.info("Apache ActiveMQ Artemis{} ({}, {}) is shutting down", new Object[]{getBrokerVersion(), getBrokerName(), brokerId}); if (broker != null) { - System.out.println("______________________stopping broker: " + broker.getClass().getName()); broker.stop(); broker = null; } + tmpfolder.delete(); LOG.info("Apache ActiveMQ Artemis {} ({}, {}) is shutdown", new Object[]{getBrokerVersion(), getBrokerName(), brokerId}); } @@ -200,7 +217,7 @@ public class BrokerService implements Service { public Broker getBroker() throws Exception { if (broker == null) { - broker = createBroker(); + broker = createBroker(tmpfolder.getRoot()); } return broker; } @@ -220,13 +237,14 @@ public class BrokerService implements Service { this.brokerName = str.trim(); } - protected Broker createBroker() throws Exception { - broker = createBrokerWrapper(); + protected Broker createBroker(File temporaryFile) throws Exception { + new Exception("file=" + temporaryFile.getAbsolutePath()).printStackTrace(); + broker = createBrokerWrapper(temporaryFile); return broker; } - private Broker createBrokerWrapper() { - return new ArtemisBrokerWrapper(this); + private Broker createBrokerWrapper(File temporaryFile) { + return new ArtemisBrokerWrapper(this, temporaryFile); } public void makeSureDestinationExists(ActiveMQDestination activemqDestination) throws Exception { @@ -382,10 +400,6 @@ public class BrokerService implements Service { public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { } - public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception { - return null; - } - public TransportConnector getConnectorByName(String connectorName) { return null; } @@ -407,8 +421,17 @@ public class BrokerService implements Service { public void setSchedulerDirectoryFile(File schedulerDirectory) { } + public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception { + return addNetworkConnector(new URI(discoveryAddress)); + } + + public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception { + NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress); + return addNetworkConnector(connector); + } + public List getNetworkConnectors() { - return new ArrayList<>(); + return this.networkConnectors; } public void setSchedulerSupport(boolean schedulerSupport) { @@ -471,6 +494,30 @@ public class BrokerService implements Service { } public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception { + connector.setBrokerService(this); + + System.out.println("------------------------ this broker uri: " + this.getConnectURI()); + connector.setLocalUri(this.getConnectURI()); + // Set a connection filter so that the connector does not establish loop + // back connections. + connector.setConnectionFilter(new ConnectionFilter() { + @Override + public boolean connectTo(URI location) { + List transportConnectors = getTransportConnectors(); + for (Iterator iter = transportConnectors.iterator(); iter.hasNext();) { + try { + TransportConnector tc = iter.next(); + if (location.equals(tc.getConnectUri())) { + return false; + } + } catch (Throwable e) { + } + } + return true; + } + }); + + networkConnectors.add(connector); return connector; } @@ -486,19 +533,63 @@ public class BrokerService implements Service { public TransportConnector addConnector(URI bindAddress) throws Exception { Integer port = bindAddress.getPort(); + String host = bindAddress.getHost(); FakeTransportConnector connector = null; - if (port != 0) { - connector = new FakeTransportConnector(bindAddress); - this.transportConnectors.add(connector); - this.extraConnectors.add(port); + + host = (host == null || host.length() == 0) ? "localhost" : host; + if ("0.0.0.0".equals(host)) { + host = "localhost"; } - else { - connector = new FakeTransportConnector(new URI(this.getDefaultUri())); - this.transportConnectors.add(connector); + + if (port == 0) { + //In actual impl in amq5, after connector has been added the socket + //is bound already. This means in case of 0 port uri, the random + //port is available after this call. With artemis wrapper however + //the real binding happens during broker start. To work around this + //we use manually calculated port for that. + port = getPseudoRandomPort(); + } + + System.out.println("Now host is: " + host); + bindAddress = new URI(bindAddress.getScheme(), bindAddress.getUserInfo(), + host, port, bindAddress.getPath(), bindAddress.getQuery(), bindAddress.getFragment()); + + connector = new FakeTransportConnector(bindAddress); + this.transportConnectors.add(connector); + this.extraConnectors.add(port); + return connector; } + private int getPseudoRandomPort() { + int port = RANDOM_PORT_BASE.getAndIncrement(); + while (!checkPort(port)) { + port = RANDOM_PORT_BASE.getAndIncrement(); + } + return port; + } + + private static boolean checkPort(final int port) { + ServerSocket ssocket = null; + try { + ssocket = new ServerSocket(port); + } + catch (Exception e) { + return false; + } + finally { + if (ssocket != null) { + try { + ssocket.close(); + } + catch (IOException e) { + } + } + } + return true; + } + public void setCacheTempDestinations(boolean cacheTempDestinations) { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d4152808/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java index 5c052a6..fb3c242 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java @@ -17,7 +17,6 @@ package org.apache.activemq.broker.artemiswrapper; import java.io.File; -import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.Map; @@ -65,7 +64,6 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.PListStore; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.usage.Usage; -import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,20 +81,19 @@ public abstract class ArtemisBrokerBase implements Broker { protected volatile boolean stopped; protected BrokerId brokerId = new BrokerId("Artemis Broker"); protected BrokerService bservice; - protected TemporaryFolder temporaryFolder = new TemporaryFolder(); - protected String testDir; + + protected final File temporaryFolder; + protected final String testDir; protected boolean realStore = false; protected ActiveMQServer server; protected boolean enableSecurity = false; - public ArtemisBrokerBase() { - try { - this.temporaryFolder.create(); - } - catch (IOException e) { - } + public ArtemisBrokerBase(File temporaryFolder) { + this.temporaryFolder = temporaryFolder; + this.testDir = temporaryFolder.getAbsolutePath(); + } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d4152808/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java index 61d6250..3ad6072 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.artemiswrapper; +import java.io.File; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -46,20 +47,16 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { protected final Map testQueues = new HashMap<>(); protected JMSServerManagerImpl jmsServer; - public ArtemisBrokerWrapper(BrokerService brokerService) { + public ArtemisBrokerWrapper(BrokerService brokerService, File temporaryFolder) { + super(temporaryFolder); this.bservice = brokerService; } @Override public void start() throws Exception { - testDir = temporaryFolder.getRoot().getAbsolutePath(); clearDataRecreateServerDirs(); server = createServer(realStore, true); server.getConfiguration().getAcceptorConfigurations().clear(); - HashMap params = new HashMap<>(); - params.put(TransportConstants.PORT_PROP_NAME, "61616"); - params.put(TransportConstants.PROTOCOLS_PROP_NAME, "OPENWIRE,CORE"); - TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); Configuration serverConfig = server.getConfiguration(); @@ -82,9 +79,11 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { commonSettings.setDeadLetterAddress(dla); commonSettings.setAutoCreateJmsQueues(true); - serverConfig.getAcceptorConfigurations().add(transportConfiguration); + HashMap params = new HashMap(); + if (bservice.extraConnectors.size() == 0) { + serverConfig.addAcceptorConfiguration("home", "tcp://localhost:61616?protocols=OPENWIRE,CORE"); + } if (this.bservice.enableSsl()) { - params = new HashMap<>(); params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true); params.put(TransportConstants.PORT_PROP_NAME, 61611); params.put(TransportConstants.PROTOCOLS_PROP_NAME, "OPENWIRE"); @@ -102,14 +101,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { } for (Integer port : bservice.extraConnectors) { - if (port.intValue() != 61616) { - //extra port - params = new HashMap<>(); - params.put(TransportConstants.PORT_PROP_NAME, port.intValue()); - params.put(TransportConstants.PROTOCOLS_PROP_NAME, "OPENWIRE"); - TransportConfiguration extraTransportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); - serverConfig.getAcceptorConfigurations().add(extraTransportConfiguration); - } + serverConfig.addAcceptorConfiguration("homePort" + port, "tcp://localhost:" + port + "?protocols=OPENWIRE,CORE"); } serverConfig.setSecurityEnabled(enableSecurity); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d4152808/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java new file mode 100644 index 0000000..be9cf06 --- /dev/null +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.artemiswrapper; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; +import org.apache.activemq.artemis.api.jms.management.JMSQueueControl; +import org.apache.activemq.artemis.api.jms.management.JMSServerControl; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.BrokerService; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; + +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.ObjectName; + +public class OpenwireArtemisBaseTest { + + @Rule + public TemporaryFolder temporaryFolder; + @Rule + public TestName name = new TestName(); + + public OpenwireArtemisBaseTest() { + File tmpRoot = new File("./target/tmp"); + tmpRoot.mkdirs(); + temporaryFolder = new TemporaryFolder(tmpRoot); + //The wrapper stuff will automatically create a default + //server on a normal connection factory, which will + //cause problems with clustering tests, which starts + //all servers explicitly. Setting this to true + //can prevent the auto-creation from happening. + BrokerService.disableWrapper = true; + } + + + public String getTmp() { + return getTmpFile().getAbsolutePath(); + } + + public File getTmpFile() { + return temporaryFolder.getRoot(); + } + + protected String getJournalDir(int serverID, boolean backup) { + return getTmp() + "/journal_" + serverID + "_" + backup; + } + + protected String getBindingsDir(int serverID, boolean backup) { + return getTmp() + "/binding_" + serverID + "_" + backup; + } + + protected String getPageDir(int serverID, boolean backup) { + return getTmp() + "/paging_" + serverID + "_" + backup; + } + + protected String getLargeMessagesDir(int serverID, boolean backup) { + return getTmp() + "/paging_" + serverID + "_" + backup; + } + + public String CLUSTER_PASSWORD = "OPENWIRECLUSTER"; + + protected Configuration createConfig(final int serverID) throws Exception { + return createConfig("localhost", serverID); + } + + protected Configuration createConfig(final String hostAddress, final int serverID, final int port) throws Exception { + ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false). + setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(1000 * 1024).setJournalType(JournalType.NIO). + setJournalDirectory(getJournalDir(serverID, false)). + setBindingsDirectory(getBindingsDir(serverID, false)). + setPagingDirectory(getPageDir(serverID, false)). + setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)). + setJournalCompactMinFiles(0). + setJournalCompactPercentage(0). + setClusterPassword(CLUSTER_PASSWORD); + + configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true)); + + configuration.addAcceptorConfiguration("netty", newURIwithPort(hostAddress, port)); + configuration.addConnectorConfiguration("netty-connector", newURIwithPort(hostAddress, port)); + + return configuration; + } + + protected Configuration createConfig(final String hostAddress, final int serverID) throws Exception { + ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false). + setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(1000 * 1024).setJournalType(JournalType.NIO). + setJournalDirectory(getJournalDir(serverID, false)). + setBindingsDirectory(getBindingsDir(serverID, false)). + setPagingDirectory(getPageDir(serverID, false)). + setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)). + setJournalCompactMinFiles(0). + setJournalCompactPercentage(0). + setClusterPassword(CLUSTER_PASSWORD); + + configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true)); + + configuration.addAcceptorConfiguration("netty", newURI(hostAddress, serverID)); + configuration.addConnectorConfiguration("netty-connector", newURI(hostAddress, serverID)); + + return configuration; + } + + //extraAcceptor takes form: "?name=value&name1=value ..." + protected Configuration createConfig(final int serverID, String extraAcceptorParams) throws Exception { + ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false). + setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(JournalType.NIO). + setJournalDirectory(getJournalDir(serverID, false)). + setBindingsDirectory(getBindingsDir(serverID, false)). + setPagingDirectory(getPageDir(serverID, false)). + setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)). + setJournalCompactMinFiles(0). + setJournalCompactPercentage(0). + setClusterPassword(CLUSTER_PASSWORD); + + configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true)); + + String fullAcceptorUri = newURI(serverID) + extraAcceptorParams; + configuration.addAcceptorConfiguration("netty", fullAcceptorUri); + + configuration.addConnectorConfiguration("netty-connector", newURI(serverID)); + return configuration; + } + + public void deployClusterConfiguration(Configuration config, Integer ... targetIDs) throws Exception { + StringBuffer stringBuffer = new StringBuffer(); + String separator = ""; + for (int x : targetIDs) { + stringBuffer.append(separator + newURI(x)); + separator = ","; + } + + String ccURI = "static://(" + stringBuffer.toString() + ")?connectorName=netty-connector;retryInterval=500;messageLoadBalancingType=STRICT;maxHops=1"; + + config.addClusterConfiguration("clusterCC", ccURI); + } + + protected static String newURI(int serverID) { + return newURI("localhost", serverID); + } + + protected static String newURI(String localhostAddress, int serverID) { + return "tcp://" + localhostAddress + ":" + (61616 + serverID); + } + + protected static String newURIwithPort(String localhostAddress, int port) { + return "tcp://" + localhostAddress + ":" + port; + } + + public static JMSServerControl createJMSServerControl(final MBeanServer mbeanServer) throws Exception { + return (JMSServerControl) createProxy(ObjectNameBuilder.DEFAULT.getJMSServerObjectName(), JMSServerControl.class, mbeanServer); + } + + public static JMSQueueControl createJMSQueueControl(final String name, + final MBeanServer mbeanServer) throws Exception { + return (JMSQueueControl) createProxy(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(name), JMSQueueControl.class, mbeanServer); + } + + private static Object createProxy(final ObjectName objectName, + final Class mbeanInterface, + final MBeanServer mbeanServer) { + return MBeanServerInvocationHandler.newProxyInstance(mbeanServer, objectName, mbeanInterface, false); + } + + protected void shutDownClusterServers(EmbeddedJMS[] servers) throws Exception { + for (int i = 0; i < servers.length; i++) { + try { + servers[i].stop(); + } + catch (Throwable t) { + t.printStackTrace(); + } + } + } + + protected void shutDownNonClusterServers(EmbeddedJMS[] servers) throws Exception { + shutDownClusterServers(servers); + } + + protected void setUpNonClusterServers(EmbeddedJMS[] servers) throws Exception { + + Configuration[] serverCfgs = new Configuration[servers.length]; + for (int i = 0; i < servers.length; i++) { + serverCfgs[i] = createConfig(i); + } + + for (int i = 0; i < servers.length; i++) { + servers[i] = new EmbeddedJMS().setConfiguration(serverCfgs[i]).setJmsConfiguration(new JMSConfigurationImpl()); + } + + for (int i = 0; i < servers.length; i++) { + servers[i].start(); + } + } + + protected void setUpClusterServers(EmbeddedJMS[] servers) throws Exception { + + Configuration[] serverCfgs = new Configuration[servers.length]; + for (int i = 0; i < servers.length; i++) { + serverCfgs[i] = createConfig(i); + } + + for (int i = 0; i < servers.length; i++) { + deployClusterConfiguration(serverCfgs[i], getTargets(servers.length, i)); + } + + for (int i = 0; i < servers.length; i++) { + servers[i] = new EmbeddedJMS().setConfiguration(serverCfgs[i]).setJmsConfiguration(new JMSConfigurationImpl()); + } + + for (int i = 0; i < servers.length; i++) { + servers[i].start(); + } + + for (int i = 0; i < servers.length; i++) { + Assert.assertTrue(servers[i].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, servers.length)); + } + } + + private Integer[] getTargets(int total, int self) + { + int lenTargets = total - self; + List targets = new ArrayList<>(); + for (int i = 0; i < lenTargets; i++) { + if (i != self) { + targets.add(i); + } + } + return targets.toArray(new Integer[0]); + } + + public EmbeddedJMS createBroker() throws Exception { + Configuration config0 = createConfig(0); + EmbeddedJMS newbroker = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + return newbroker; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d4152808/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java index 34babf8..0843d3a 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -29,6 +30,7 @@ import javax.net.SocketFactory; import org.apache.activemq.TransportLoggerSupport; import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper; import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.transport.*; import org.apache.activemq.util.IOExceptionSupport; @@ -54,11 +56,10 @@ public class TcpTransportFactory extends TransportFactory { //here check broker, if no broker, we start one Map params = URISupport.parseParameters(location); String brokerId = params.remove("invmBrokerId"); - params.clear(); - location = URISupport.createRemainingURI(location, params); - if (brokerService == null) { + URI location1 = URISupport.createRemainingURI(location, Collections.EMPTY_MAP); + if (brokerService == null && !BrokerService.disableWrapper) { - ArtemisBrokerHelper.startArtemisBroker(location); + ArtemisBrokerHelper.startArtemisBroker(location1); brokerService = location.toString(); if (brokerId != null) { @@ -66,7 +67,8 @@ public class TcpTransportFactory extends TransportFactory { System.out.println("bound: " + brokerId); } } - return super.doConnect(location); + URI location2 = URISupport.createRemainingURI(location, params); + return super.doConnect(location2); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d4152808/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java deleted file mode 100644 index fd06de9..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import java.io.InputStream; -import java.io.OutputStream; - -import javax.jms.Queue; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQDestination; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Deprecated -public class ActiveMQInputStreamTest extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(ActiveMQInputStreamTest.class); - - private static final String BROKER_URL = "tcp://localhost:0"; - private static final String DESTINATION = "destination"; - private static final int STREAM_LENGTH = 64 * 1024 + 0; // change 0 to 1 to make it not crash - - private BrokerService broker; - private String connectionUri; - - @Override - public void setUp() throws Exception { - broker = new BrokerService(); - broker.setUseJmx(false); - broker.setPersistent(false); - broker.setDestinations(new ActiveMQDestination[]{ActiveMQDestination.createDestination(DESTINATION, ActiveMQDestination.QUEUE_TYPE),}); - broker.addConnector(BROKER_URL); - broker.start(); - broker.waitUntilStarted(); - - //some internal api we don't implement - connectionUri = broker.getDefaultUri(); - } - - @Override - public void tearDown() throws Exception { - broker.stop(); - broker.waitUntilStopped(); - } - - public void testInputStreamSetSyncSendOption() throws Exception { - - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); - ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue destination = session.createQueue(DESTINATION + "?producer.alwaysSyncSend=true"); - - OutputStream out = null; - try { - out = connection.createOutputStream(destination); - - assertTrue(((ActiveMQOutputStream) out).isAlwaysSyncSend()); - - LOG.debug("writing..."); - for (int i = 0; i < STREAM_LENGTH; ++i) { - out.write(0); - } - LOG.debug("wrote " + STREAM_LENGTH + " bytes"); - } - finally { - if (out != null) { - out.close(); - } - } - - InputStream in = null; - try { - in = connection.createInputStream(destination); - LOG.debug("reading..."); - int count = 0; - while (-1 != in.read()) { - ++count; - } - LOG.debug("read " + count + " bytes"); - } - finally { - if (in != null) { - in.close(); - } - } - - connection.close(); - } - - public void testInputStreamMatchesDefaultChuckSize() throws Exception { - - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); - ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue destination = session.createQueue(DESTINATION); - - OutputStream out = null; - try { - out = connection.createOutputStream(destination); - LOG.debug("writing..."); - for (int i = 0; i < STREAM_LENGTH; ++i) { - out.write(0); - } - LOG.debug("wrote " + STREAM_LENGTH + " bytes"); - } - finally { - if (out != null) { - out.close(); - } - } - - InputStream in = null; - try { - in = connection.createInputStream(destination); - LOG.debug("reading..."); - int count = 0; - while (-1 != in.read()) { - ++count; - } - LOG.debug("read " + count + " bytes"); - } - finally { - if (in != null) { - in.close(); - } - } - - connection.close(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d4152808/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java new file mode 100644 index 0000000..f47620f --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicBoolean; + +import junit.framework.TestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Enforces a test case to run for only an allotted time to prevent them from + * hanging and breaking the whole testing. + * + * + */ + +public abstract class AutoFailTestSupport extends TestCase { + public static final int EXIT_SUCCESS = 0; + public static final int EXIT_ERROR = 1; + private static final Logger LOG = LoggerFactory.getLogger(AutoFailTestSupport.class); + + private long maxTestTime = 5 * 60 * 1000; // 5 mins by default + private Thread autoFailThread; + + private boolean verbose = true; + private boolean useAutoFail; // Disable auto fail by default + private AtomicBoolean isTestSuccess; + + protected void setUp() throws Exception { + // Runs the auto fail thread before performing any setup + if (isAutoFail()) { + startAutoFailThread(); + } + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + + // Stops the auto fail thread only after performing any clean up + stopAutoFailThread(); + } + + /** + * Manually start the auto fail thread. To start it automatically, just set + * the auto fail to true before calling any setup methods. As a rule, this + * method is used only when you are not sure, if the setUp and tearDown + * method is propagated correctly. + */ + public void startAutoFailThread() { + setAutoFail(true); + isTestSuccess = new AtomicBoolean(false); + autoFailThread = new Thread(new Runnable() { + public void run() { + try { + // Wait for test to finish succesfully + Thread.sleep(getMaxTestTime()); + } catch (InterruptedException e) { + // This usually means the test was successful + } finally { + // Check if the test was able to tear down succesfully, + // which usually means, it has finished its run. + if (!isTestSuccess.get()) { + LOG.error("Test case has exceeded the maximum allotted time to run of: " + getMaxTestTime() + " ms."); + dumpAllThreads(getName()); + if (System.getProperty("org.apache.activemq.AutoFailTestSupport.disableSystemExit") == null) { + System.exit(EXIT_ERROR); + } else { + LOG.error("No system.exit as it kills surefire - forkedProcessTimeoutInSeconds (surefire.timeout) will kick in eventually see pom.xml surefire plugin config"); + } + } + } + } + }, "AutoFailThread"); + + if (verbose) { + LOG.info("Starting auto fail thread..."); + } + + LOG.info("Starting auto fail thread..."); + autoFailThread.start(); + } + + /** + * Manually stops the auto fail thread. As a rule, this method is used only + * when you are not sure, if the setUp and tearDown method is propagated + * correctly. + */ + public void stopAutoFailThread() { + if (isAutoFail() && autoFailThread != null && autoFailThread.isAlive()) { + isTestSuccess.set(true); + + if (verbose) { + LOG.info("Stopping auto fail thread..."); + } + + LOG.info("Stopping auto fail thread..."); + autoFailThread.interrupt(); + } + } + + /** + * Sets the auto fail value. As a rule, this should be used only before any + * setup methods is called to automatically enable the auto fail thread in + * the setup method of the test case. + * + * @param val + */ + public void setAutoFail(boolean val) { + this.useAutoFail = val; + } + + public boolean isAutoFail() { + return this.useAutoFail; + } + + /** + * The assigned value will only be reflected when the auto fail thread has + * started its run. Value is in milliseconds. + * + * @param val + */ + public void setMaxTestTime(long val) { + this.maxTestTime = val; + } + + public long getMaxTestTime() { + return this.maxTestTime; + } + + + public static void dumpAllThreads(String prefix) { + Map stacks = Thread.getAllStackTraces(); + for (Entry stackEntry : stacks.entrySet()) { + System.err.println(prefix + " " + stackEntry.getKey()); + for(StackTraceElement element : stackEntry.getValue()) { + System.err.println(" " + element); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d4152808/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java index 5e5b993..b8397e2 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java @@ -52,22 +52,50 @@ public class ConnectionCleanupTest extends TestCase { try { connection.setClientID("test"); - // fail("Should have received JMSException"); + fail("Should have received JMSException"); } catch (JMSException e) { } - connection.cleanup(); + connection.doCleanup(true); connection.setClientID("test"); connection.createSession(false, Session.AUTO_ACKNOWLEDGE); try { connection.setClientID("test"); - // fail("Should have received JMSException"); + fail("Should have received JMSException"); } catch (JMSException e) { } } + public void testChangeClientIDDenied() throws JMSException { + + connection.setClientID("test"); + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + try { + connection.setClientID("test"); + fail("Should have received JMSException"); + } catch (JMSException e) { + } + + connection.cleanup(); + + try { + connection.setClientID("test"); + fail("Should have received JMSException"); + } catch (JMSException e) { + } + + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + try { + connection.setClientID("test"); + fail("Should have received JMSException"); + } catch (JMSException e) { + } + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d4152808/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java index fa58ebe..b8dea70 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java @@ -16,15 +16,23 @@ */ package org.apache.activemq; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.junit.rules.TemporaryFolder; import org.springframework.jms.core.JmsTemplate; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; +import java.io.File; /** * A useful base class which creates and closes an embedded broker @@ -32,17 +40,26 @@ import javax.jms.Destination; public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport { protected BrokerService broker; - // protected String bindAddress = "tcp://localhost:61616"; - protected String bindAddress = "vm://localhost"; + protected EmbeddedJMS artemisBroker; + protected String bindAddress = "tcp://localhost:61616"; protected ConnectionFactory connectionFactory; protected boolean useTopic; protected ActiveMQDestination destination; protected JmsTemplate template; - @Override + public TemporaryFolder temporaryFolder; + + public String CLUSTER_PASSWORD = "OPENWIRECLUSTER"; + protected void setUp() throws Exception { - if (broker == null) { - broker = createBroker(); + BrokerService.disableWrapper = true; + File tmpRoot = new File("./target/tmp"); + tmpRoot.mkdirs(); + temporaryFolder = new TemporaryFolder(tmpRoot); + temporaryFolder.create(); + + if (artemisBroker == null) { + artemisBroker = createArtemisBroker(); } startBroker(); @@ -58,13 +75,42 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport { @Override protected void tearDown() throws Exception { - if (broker != null) { + if (artemisBroker != null) { try { - broker.stop(); + artemisBroker.stop(); } catch (Exception e) { } } + temporaryFolder.delete(); + } + + public String getTmp() { + return getTmpFile().getAbsolutePath(); + } + + public File getTmpFile() { + return temporaryFolder.getRoot(); + } + + protected String getJournalDir(int serverID, boolean backup) { + return getTmp() + "/journal_" + serverID + "_" + backup; + } + + protected String getBindingsDir(int serverID, boolean backup) { + return getTmp() + "/binding_" + serverID + "_" + backup; + } + + protected String getPageDir(int serverID, boolean backup) { + return getTmp() + "/paging_" + serverID + "_" + backup; + } + + protected String getLargeMessagesDir(int serverID, boolean backup) { + return getTmp() + "/paging_" + serverID + "_" + backup; + } + + protected static String newURI(String localhostAddress, int serverID) { + return "tcp://" + localhostAddress + ":" + (61616 + serverID); } /** @@ -114,20 +160,44 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport { return new ActiveMQConnectionFactory(bindAddress); } - /** - * Factory method to create a new broker - * - * @throws Exception - */ + + public EmbeddedJMS createArtemisBroker() throws Exception { + Configuration config0 = createConfig("localhost", 0); + EmbeddedJMS newbroker = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + return newbroker; + } + + protected Configuration createConfig(final String hostAddress, final int serverID) throws Exception { + ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false). + setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(1000 * 1024).setJournalType(JournalType.NIO). + setJournalDirectory(getJournalDir(serverID, false)). + setBindingsDirectory(getBindingsDir(serverID, false)). + setPagingDirectory(getPageDir(serverID, false)). + setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)). + setJournalCompactMinFiles(0). + setJournalCompactPercentage(0). + setClusterPassword(CLUSTER_PASSWORD); + + configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true)); + + configuration.addAcceptorConfiguration("netty", newURI(hostAddress, serverID)); + configuration.addConnectorConfiguration("netty-connector", newURI(hostAddress, serverID)); + + return configuration; + } + + //we keep this because some other tests uses it. + //we'll delete this when those tests are dealt with. protected BrokerService createBroker() throws Exception { BrokerService answer = new BrokerService(); answer.setPersistent(isPersistent()); + answer.getManagementContext().setCreateConnector(false); answer.addConnector(bindAddress); return answer; } protected void startBroker() throws Exception { - broker.start(); + artemisBroker.start(); } /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d4152808/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java new file mode 100755 index 0000000..b7c2e94 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.ArrayList; +import java.util.Enumeration; + +import org.apache.activemq.test.JmsResourceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class JmsQueueTransactionTest extends JmsTransactionTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(JmsQueueTransactionTest.class); + + /** + * @see org.apache.activemq.JmsTransactionTestSupport#getJmsResourceProvider() + */ + protected JmsResourceProvider getJmsResourceProvider() { + JmsResourceProvider p = new JmsResourceProvider(); + p.setTopic(false); + return p; + } + + /** + * Tests if the the connection gets reset, the messages will still be + * received. + * + * @throws Exception + */ + public void testReceiveTwoThenCloseConnection() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // lets consume any outstanding messages from previous test runs + beginTx(); + while (consumer.receive(1000) != null) { + } + commitTx(); + + beginTx(); + producer.send(outbound[0]); + producer.send(outbound[1]); + commitTx(); + + LOG.info("Sent 0: " + outbound[0]); + LOG.info("Sent 1: " + outbound[1]); + + ArrayList messages = new ArrayList(); + beginTx(); + Message message = consumer.receive(2000); + assertEquals(outbound[0], message); + + message = consumer.receive(2000); + assertNotNull(message); + assertEquals(outbound[1], message); + + // Close and reopen connection. + reconnect(); + + // Consume again.. the previous message should + // get redelivered. + beginTx(); + message = consumer.receive(2000); + assertNotNull("Should have re-received the first message again!", message); + messages.add(message); + assertEquals(outbound[0], message); + + message = consumer.receive(5000); + assertNotNull("Should have re-received the second message again!", message); + messages.add(message); + assertEquals(outbound[1], message); + commitTx(); + + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + + assertTextMessagesEqual("Rollback did not work", outbound, inbound); + } + + /** + * Tests sending and receiving messages with two sessions(one for producing + * and another for consuming). + * + * @throws Exception + */ + public void testSendReceiveInSeperateSessionTest() throws Exception { + session.close(); + int batchCount = 10; + + for (int i = 0; i < batchCount; i++) { + // Session that sends messages + { + Session session = resourceProvider.createSession(connection); + this.session = session; + MessageProducer producer = resourceProvider.createProducer(session, destination); + // consumer = resourceProvider.createConsumer(session, + // destination); + beginTx(); + producer.send(session.createTextMessage("Test Message: " + i)); + commitTx(); + session.close(); + } + + // Session that consumes messages + { + Session session = resourceProvider.createSession(connection); + this.session = session; + MessageConsumer consumer = resourceProvider.createConsumer(session, destination); + + beginTx(); + TextMessage message = (TextMessage)consumer.receive(1000 * 5); + assertNotNull("Received only " + i + " messages in batch ", message); + assertEquals("Test Message: " + i, message.getText()); + + commitTx(); + session.close(); + } + } + } + + /** + * Tests the queue browser. Browses the messages then the consumer tries to + * receive them. The messages should still be in the queue even when it was + * browsed. + * + * @throws Exception + */ + public void testReceiveBrowseReceive() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message")}; + + // lets consume any outstanding messages from previous test runs + beginTx(); + while (consumer.receive(1000) != null) { + } + commitTx(); + + beginTx(); + producer.send(outbound[0]); + producer.send(outbound[1]); + producer.send(outbound[2]); + commitTx(); + + // Get the first. + beginTx(); + assertEquals(outbound[0], consumer.receive(1000)); + consumer.close(); + commitTx(); + + beginTx(); + QueueBrowser browser = session.createBrowser((Queue)destination); + Enumeration enumeration = browser.getEnumeration(); + + // browse the second + assertTrue("should have received the second message", enumeration.hasMoreElements()); + assertEquals(outbound[1], (Message)enumeration.nextElement()); + + // browse the third. + assertTrue("Should have received the third message", enumeration.hasMoreElements()); + assertEquals(outbound[2], (Message)enumeration.nextElement()); + + LOG.info("Check for more..."); + // There should be no more. + boolean tooMany = false; + while (enumeration.hasMoreElements()) { + LOG.info("Got extra message: " + ((TextMessage)enumeration.nextElement()).getText()); + tooMany = true; + } + assertFalse(tooMany); + LOG.info("close browser..."); + browser.close(); + + LOG.info("reopen and consume..."); + // Re-open the consumer. + consumer = resourceProvider.createConsumer(session, destination); + // Receive the second. + assertEquals(outbound[1], consumer.receive(1000)); + // Receive the third. + assertEquals(outbound[2], consumer.receive(1000)); + consumer.close(); + + commitTx(); + } + + public void testCloseConsumer() throws Exception { + Destination dest = session.createQueue(getSubject() + "?consumer.prefetchSize=0"); + producer = session.createProducer(dest); + beginTx(); + producer.send(session.createTextMessage("message 1")); + producer.send(session.createTextMessage("message 2")); + commitTx(); + + beginTx(); + consumer = session.createConsumer(dest); + Message message1 = consumer.receive(1000); + String text1 = ((TextMessage)message1).getText(); + assertNotNull(message1); + assertEquals("message 1", text1); + + consumer.close(); + + consumer = session.createConsumer(dest); + + Message message2 = consumer.receive(1000); + String text2 = ((TextMessage)message2).getText(); + assertNotNull(message2); + assertEquals("message 2", text2); + commitTx(); + } + +}