Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3BA4518A35 for ; Wed, 9 Mar 2016 19:42:27 +0000 (UTC) Received: (qmail 4911 invoked by uid 500); 9 Mar 2016 19:42:26 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 4792 invoked by uid 500); 9 Mar 2016 19:42:26 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 3196 invoked by uid 99); 9 Mar 2016 19:42:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Mar 2016 19:42:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6295FDFD43; Wed, 9 Mar 2016 19:42:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Wed, 09 Mar 2016 19:42:43 -0000 Message-Id: In-Reply-To: <3b2eeb12253e4e0db5244b3974968001@git.apache.org> References: <3b2eeb12253e4e0db5244b3974968001@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [20/58] [abbrv] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a3dfb84/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3622Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3622Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3622Test.java deleted file mode 100644 index 7556def..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3622Test.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.fail; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.FilePendingSubscriberMessageStoragePolicy; -import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.transport.stomp.Stomp; -import org.apache.activemq.transport.stomp.StompConnection; -import org.apache.activemq.util.DefaultTestAppender; -import org.apache.log4j.Appender; -import org.apache.log4j.Logger; -import org.apache.log4j.spi.LoggingEvent; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ3622Test { - - protected BrokerService broker; - protected AtomicBoolean failed = new AtomicBoolean(false); - protected String connectionUri; - protected Appender appender = new DefaultTestAppender() { - - @Override - public void doAppend(LoggingEvent event) { - System.err.println(event.getMessage()); - if (event.getThrowableInformation() != null) { - if (event.getThrowableInformation().getThrowable() instanceof NullPointerException) { - failed.set(true); - } - } - } - }; - - @Before - public void before() throws Exception { - Logger.getRootLogger().addAppender(appender); - - broker = new BrokerService(); - broker.setDataDirectory("target" + File.separator + "activemq-data"); - broker.setPersistent(true); - broker.setDeleteAllMessagesOnStartup(true); - PolicyEntry policy = new PolicyEntry(); - policy.setTopic(">"); - policy.setProducerFlowControl(false); - policy.setMemoryLimit(1 * 1024 * 1024); - policy.setPendingSubscriberPolicy(new FilePendingSubscriberMessageStoragePolicy()); - policy.setSubscriptionRecoveryPolicy(new LastImageSubscriptionRecoveryPolicy()); - policy.setExpireMessagesPeriod(500); - List entries = new ArrayList<>(); - - entries.add(policy); - PolicyMap pMap = new PolicyMap(); - pMap.setPolicyEntries(entries); - broker.setDestinationPolicy(pMap); - - connectionUri = broker.addConnector("stomp://localhost:0").getPublishableConnectString(); - - broker.start(); - broker.waitUntilStarted(); - } - - @After - public void after() throws Exception { - broker.stop(); - broker.waitUntilStopped(); - Logger.getRootLogger().removeAppender(appender); - } - - @Test - public void go() throws Exception { - StompConnection connection = new StompConnection(); - Integer port = Integer.parseInt(connectionUri.split(":")[2]); - connection.open("localhost", port); - connection.connect("", ""); - connection.subscribe("/topic/foobar", Stomp.Headers.Subscribe.AckModeValues.CLIENT); - connection.disconnect(); - Thread.sleep(1000); - - if (failed.get()) { - fail("Received NullPointerException"); - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a3dfb84/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java deleted file mode 100644 index 188b48c..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.util.DefaultTestAppender; -import org.apache.log4j.Appender; -import org.apache.log4j.Logger; -import org.apache.log4j.spi.LoggingEvent; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -/** - * - */ - -public class AMQ3625Test { - - protected BrokerService broker1; - protected BrokerService broker2; - - protected AtomicBoolean authenticationFailed = new AtomicBoolean(false); - protected AtomicBoolean gotNPE = new AtomicBoolean(false); - - protected String java_security_auth_login_config = "java.security.auth.login.config"; - protected String xbean = "xbean:"; - protected String base = "src/test/resources/org/apache/activemq/bugs/amq3625"; - protected String conf = "conf"; - protected String keys = "keys"; - protected String JaasStompSSLBroker1_xml = "JaasStompSSLBroker1.xml"; - protected String JaasStompSSLBroker2_xml = "JaasStompSSLBroker2.xml"; - - protected String oldLoginConf = null; - - @Before - public void before() throws Exception { - if (System.getProperty(java_security_auth_login_config) != null) { - oldLoginConf = System.getProperty(java_security_auth_login_config); - } - System.setProperty(java_security_auth_login_config, base + "/" + conf + "/" + "login.config"); - broker1 = BrokerFactory.createBroker(xbean + base + "/" + conf + "/" + JaasStompSSLBroker1_xml); - broker2 = BrokerFactory.createBroker(xbean + base + "/" + conf + "/" + JaasStompSSLBroker2_xml); - - broker1.start(); - broker1.waitUntilStarted(); - broker2.start(); - broker2.waitUntilStarted(); - } - - @After - public void after() throws Exception { - broker1.stop(); - broker2.stop(); - - if (oldLoginConf != null) { - System.setProperty(java_security_auth_login_config, oldLoginConf); - } - } - - @Test - public void go() throws Exception { - Appender appender = new DefaultTestAppender() { - @Override - public void doAppend(LoggingEvent event) { - if (event.getThrowableInformation() != null) { - Throwable t = event.getThrowableInformation().getThrowable(); - if (t instanceof SecurityException) { - authenticationFailed.set(true); - } - if (t instanceof NullPointerException) { - gotNPE.set(true); - } - } - } - }; - Logger.getRootLogger().addAppender(appender); - - String connectURI = broker1.getConnectorByName("openwire").getConnectUri().toString(); - connectURI = connectURI.replace("?needClientAuth=true", ""); - broker2.addNetworkConnector("static:(" + connectURI + ")").start(); - - Thread.sleep(10 * 1000); - - Logger.getRootLogger().removeAppender(appender); - - assertTrue(authenticationFailed.get()); - assertFalse(gotNPE.get()); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a3dfb84/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3674Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3674Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3674Test.java deleted file mode 100644 index c691c42..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3674Test.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import static org.junit.Assert.*; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TopicSubscriber; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.broker.jmx.BrokerView; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ3674Test { - - private static Logger LOG = LoggerFactory.getLogger(AMQ3674Test.class); - - private final static int deliveryMode = DeliveryMode.NON_PERSISTENT; - private final static ActiveMQTopic destination = new ActiveMQTopic("XYZ"); - - private ActiveMQConnectionFactory factory; - private BrokerService broker; - - @Test - public void removeSubscription() throws Exception { - - final Connection producerConnection = factory.createConnection(); - producerConnection.start(); - final Connection consumerConnection = factory.createConnection(); - - consumerConnection.setClientID("subscriber1"); - Session consumerMQSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - TopicSubscriber activeConsumer = consumerMQSession.createDurableSubscriber(destination, "myTopic"); - consumerConnection.start(); - - Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(deliveryMode); - - final BrokerView brokerView = broker.getAdminView(); - - assertEquals(1, brokerView.getDurableTopicSubscribers().length); - - LOG.info("Current Durable Topic Subscriptions: " + brokerView.getDurableTopicSubscribers().length); - - try { - brokerView.destroyDurableSubscriber("subscriber1", "myTopic"); - fail("Expected Exception for Durable consumer is in use"); - } - catch (Exception e) { - LOG.info("Received expected exception: " + e.getMessage()); - } - - LOG.info("Current Durable Topic Subscriptions: " + brokerView.getDurableTopicSubscribers().length); - - assertEquals(1, brokerView.getDurableTopicSubscribers().length); - - activeConsumer.close(); - consumerConnection.stop(); - - assertTrue("The subscription should be in the inactive state.", Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return brokerView.getInactiveDurableTopicSubscribers().length == 1; - } - })); - - try { - brokerView.destroyDurableSubscriber("subscriber1", "myTopic"); - } - finally { - producer.close(); - producerConnection.close(); - } - } - - @Before - public void setUp() throws Exception { - broker = new BrokerService(); - broker.setPersistent(false); - broker.setUseJmx(true); - broker.setDeleteAllMessagesOnStartup(true); - TransportConnector connector = broker.addConnector("tcp://localhost:0"); - broker.start(); - - factory = new ActiveMQConnectionFactory(connector.getPublishableConnectString()); - factory.setAlwaysSyncSend(true); - factory.setDispatchAsync(false); - } - - @After - public void tearDown() throws Exception { - broker.stop(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a3dfb84/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3675Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3675Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3675Test.java deleted file mode 100644 index 6815923..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3675Test.java +++ /dev/null @@ -1,162 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import static org.junit.Assert.*; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TopicSubscriber; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.broker.jmx.BrokerView; -import org.apache.activemq.broker.jmx.TopicViewMBean; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ3675Test { - - private static Logger LOG = LoggerFactory.getLogger(AMQ3675Test.class); - - private final static int deliveryMode = DeliveryMode.NON_PERSISTENT; - private final static ActiveMQTopic destination = new ActiveMQTopic("XYZ"); - - private ActiveMQConnectionFactory factory; - private BrokerService broker; - - public TopicViewMBean getTopicView() throws Exception { - ObjectName destinationName = broker.getAdminView().getTopics()[0]; - TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true); - return topicView; - } - - @Test - public void countConsumers() throws Exception { - - final Connection producerConnection = factory.createConnection(); - producerConnection.start(); - final Connection consumerConnection = factory.createConnection(); - - consumerConnection.setClientID("subscriber1"); - Session consumerMQSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - TopicSubscriber consumer = consumerMQSession.createDurableSubscriber(destination, "myTopic"); - consumerConnection.start(); - - Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(deliveryMode); - - final BrokerView brokerView = broker.getAdminView(); - final TopicViewMBean topicView = getTopicView(); - - assertTrue("Should have one consumer on topic: ", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return topicView.getConsumerCount() == 1; - } - })); - - consumer.close(); - - assertTrue("Durable consumer should now be inactive.", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return brokerView.getInactiveDurableTopicSubscribers().length == 1; - } - })); - - try { - brokerView.removeTopic(destination.getTopicName()); - } - catch (Exception e1) { - fail("Unable to remove destination:" + destination.getPhysicalName()); - } - - assertTrue("Should have no topics on the broker", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return brokerView.getTopics().length == 0; - } - })); - - try { - brokerView.destroyDurableSubscriber("subscriber1", "myTopic"); - } - catch (Exception e) { - fail("Exception not expected when attempting to delete Durable consumer."); - } - - assertTrue("Should be no durable consumers active or inactive.", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return brokerView.getInactiveDurableTopicSubscribers().length == 0 && brokerView.getDurableTopicSubscribers().length == 0; - } - })); - - consumer = consumerMQSession.createDurableSubscriber(destination, "myTopic"); - - consumer.close(); - - assertTrue("Should be one consumer on the Topic.", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - LOG.info("Number of inactive consumers: " + brokerView.getInactiveDurableTopicSubscribers().length); - return brokerView.getInactiveDurableTopicSubscribers().length == 1; - } - })); - - final TopicViewMBean recreatedTopicView = getTopicView(); - - assertTrue("Should have one consumer on topic: ", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return recreatedTopicView.getConsumerCount() == 1; - } - })); - } - - @Before - public void setUp() throws Exception { - broker = new BrokerService(); - broker.setPersistent(false); - broker.setUseJmx(true); - broker.setAdvisorySupport(false); - broker.setDeleteAllMessagesOnStartup(true); - TransportConnector connector = broker.addConnector("tcp://localhost:0"); - broker.start(); - - factory = new ActiveMQConnectionFactory(connector.getPublishableConnectString()); - factory.setAlwaysSyncSend(true); - factory.setDispatchAsync(false); - } - - @After - public void tearDown() throws Exception { - broker.stop(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a3dfb84/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java deleted file mode 100644 index 26bef7d..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java +++ /dev/null @@ -1,216 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.net.ServerSocket; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQTopicSubscriber; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.ManagementContext; -import org.apache.activemq.command.ActiveMQTopic; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.fail; - -public class AMQ3678Test implements MessageListener { - - public int deliveryMode = DeliveryMode.NON_PERSISTENT; - - private BrokerService broker; - - AtomicInteger messagesSent = new AtomicInteger(0); - AtomicInteger messagesReceived = new AtomicInteger(0); - - ActiveMQTopic destination = new ActiveMQTopic("XYZ"); - - int port; - int jmxport; - - final CountDownLatch latch = new CountDownLatch(2); - - public static void main(String[] args) throws Exception { - - } - - public static int findFreePort() throws IOException { - ServerSocket socket = null; - - try { - // 0 is open a socket on any free port - socket = new ServerSocket(0); - return socket.getLocalPort(); - } - finally { - if (socket != null) { - socket.close(); - } - } - } - - @Test - public void countConsumers() throws JMSException { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:" + port); - factory.setAlwaysSyncSend(true); - factory.setDispatchAsync(false); - - final Connection producerConnection = factory.createConnection(); - producerConnection.start(); - - final Connection consumerConnection = factory.createConnection(); - - consumerConnection.setClientID("subscriber1"); - Session consumerMQSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - ActiveMQTopicSubscriber activeConsumer = (ActiveMQTopicSubscriber) consumerMQSession.createDurableSubscriber(destination, "myTopic?consumer.prefetchSize=1"); - - activeConsumer.setMessageListener(this); - - consumerConnection.start(); - - final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageProducer producer = producerSession.createProducer(destination); - producer.setDeliveryMode(deliveryMode); - - Thread t = new Thread(new Runnable() { - - private boolean done = false; - - @Override - public void run() { - while (!done) { - if (messagesSent.get() == 50) { - try { - broker.getAdminView().removeTopic(destination.getTopicName()); - } - catch (Exception e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - System.err.flush(); - fail("Unable to remove destination:" + destination.getPhysicalName()); - } - } - - try { - producer.send(producerSession.createTextMessage()); - int val = messagesSent.incrementAndGet(); - - System.out.println("sent message (" + val + ")"); - System.out.flush(); - - if (val == 100) { - done = true; - latch.countDown(); - producer.close(); - producerSession.close(); - - } - } - catch (JMSException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - } - }); - - t.start(); - - try { - if (!latch.await(10, TimeUnit.SECONDS)) { - fail("did not receive all the messages"); - } - } - catch (InterruptedException e) { - // TODO Auto-generated catch block - fail("did not receive all the messages, exception waiting for latch"); - e.printStackTrace(); - } - - // - - } - - @Before - public void setUp() throws Exception { - - try { - port = findFreePort(); - jmxport = findFreePort(); - } - catch (Exception e) { - fail("Unable to obtain a free port on which to start the broker"); - } - - System.out.println("Starting broker"); - System.out.flush(); - broker = new BrokerService(); - broker.setPersistent(false); - ManagementContext ctx = new ManagementContext(ManagementFactory.getPlatformMBeanServer()); - ctx.setConnectorPort(jmxport); - broker.setManagementContext(ctx); - broker.setUseJmx(true); - // broker.setAdvisorySupport(false); - // broker.setDeleteAllMessagesOnStartup(true); - - broker.addConnector("tcp://localhost:" + port).setName("Default"); - broker.start(); - - System.out.println("End of Broker Setup"); - System.out.flush(); - } - - @After - public void tearDown() throws Exception { - broker.stop(); - } - - @Override - public void onMessage(Message message) { - try { - message.acknowledge(); - int val = messagesReceived.incrementAndGet(); - System.out.println("received message (" + val + ")"); - System.out.flush(); - if (messagesReceived.get() == 100) { - latch.countDown(); - } - } - catch (JMSException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a3dfb84/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java deleted file mode 100644 index d0f6692..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; - -import java.util.Random; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQSession; -import org.apache.activemq.broker.BrokerService; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ3732Test { - - private static Logger LOG = LoggerFactory.getLogger(AMQ3732Test.class); - - private ActiveMQConnectionFactory connectionFactory; - private Connection connection; - private Session session; - private BrokerService broker; - private String connectionUri; - - private final Random pause = new Random(); - private final long NUM_MESSAGES = 25000; - private final AtomicLong totalConsumed = new AtomicLong(); - - @Before - public void startBroker() throws Exception { - broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(true); - broker.setPersistent(false); - broker.setUseJmx(false); - broker.addConnector("tcp://0.0.0.0:0"); - broker.start(); - broker.waitUntilStarted(); - - connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - - connectionFactory = new ActiveMQConnectionFactory(connectionUri); - connectionFactory.getPrefetchPolicy().setAll(0); - } - - @After - public void stopBroker() throws Exception { - connection.close(); - - broker.stop(); - broker.waitUntilStopped(); - } - - @Test(timeout = 1200000) - public void testInterruptionAffects() throws Exception { - - connection = connectionFactory.createConnection(); - connection.start(); - session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); - - Queue queue = session.createQueue("AMQ3732Test"); - - final LinkedBlockingQueue workQueue = new LinkedBlockingQueue<>(); - - final MessageConsumer consumer1 = session.createConsumer(queue); - final MessageConsumer consumer2 = session.createConsumer(queue); - final MessageProducer producer = session.createProducer(queue); - - Thread consumer1Thread = new Thread(new Runnable() { - - @Override - public void run() { - try { - while (totalConsumed.get() < NUM_MESSAGES) { - Message message = consumer1.receiveNoWait(); - if (message != null) { - workQueue.add(message); - } - } - } - catch (Exception e) { - LOG.error("Caught an unexpected error: ", e); - } - } - }); - consumer1Thread.start(); - - Thread consumer2Thread = new Thread(new Runnable() { - - @Override - public void run() { - try { - while (totalConsumed.get() < NUM_MESSAGES) { - Message message = consumer2.receive(50); - if (message != null) { - workQueue.add(message); - } - } - } - catch (Exception e) { - LOG.error("Caught an unexpected error: ", e); - } - } - }); - consumer2Thread.start(); - - Thread producerThread = new Thread(new Runnable() { - - @Override - public void run() { - try { - for (int i = 0; i < NUM_MESSAGES; ++i) { - producer.send(session.createTextMessage("TEST")); - TimeUnit.MILLISECONDS.sleep(pause.nextInt(10)); - } - } - catch (Exception e) { - LOG.error("Caught an unexpected error: ", e); - } - } - }); - producerThread.start(); - - Thread ackingThread = new Thread(new Runnable() { - - @Override - public void run() { - try { - while (totalConsumed.get() < NUM_MESSAGES) { - Message message = workQueue.take(); - message.acknowledge(); - totalConsumed.incrementAndGet(); - if ((totalConsumed.get() % 100) == 0) { - LOG.info("Consumed " + totalConsumed.get() + " messages so far."); - } - } - } - catch (Exception e) { - LOG.error("Caught an unexpected error: ", e); - } - } - }); - ackingThread.start(); - - producerThread.join(); - consumer1Thread.join(); - consumer2Thread.join(); - ackingThread.join(); - - assertEquals(NUM_MESSAGES, totalConsumed.get()); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a3dfb84/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java deleted file mode 100644 index fa354c9..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import java.util.concurrent.atomic.AtomicBoolean; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.AutoFailTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.util.LoggingBrokerPlugin; -import org.apache.activemq.util.DefaultTestAppender; -import org.apache.log4j.Appender; -import org.apache.log4j.Logger; -import org.apache.log4j.spi.LoggingEvent; - -public class AMQ3779Test extends AutoFailTestSupport { - - private static final Logger logger = Logger.getLogger(AMQ3779Test.class); - private static final String qName = "QNameToFind"; - - public void testLogPerDest() throws Exception { - - final AtomicBoolean ok = new AtomicBoolean(false); - Appender appender = new DefaultTestAppender() { - @Override - public void doAppend(LoggingEvent event) { - if (event.getLoggerName().toString().contains(qName)) { - ok.set(true); - } - } - }; - Logger.getRootLogger().addAppender(appender); - - try { - - BrokerService broker = new BrokerService(); - LoggingBrokerPlugin loggingBrokerPlugin = new LoggingBrokerPlugin(); - loggingBrokerPlugin.setPerDestinationLogger(true); - loggingBrokerPlugin.setLogAll(true); - broker.setPlugins(new LoggingBrokerPlugin[]{loggingBrokerPlugin}); - broker.start(); - - Connection connection = new ActiveMQConnectionFactory(broker.getVmConnectorURI()).createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer messageProducer = session.createProducer(session.createQueue(qName)); - messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); - connection.start(); - - messageProducer.send(session.createTextMessage("Hi")); - connection.close(); - - assertTrue("got expected log message", ok.get()); - } - finally { - logger.removeAppender(appender); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a3dfb84/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java deleted file mode 100644 index 4824855..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ3841Test { - - static final Logger LOG = LoggerFactory.getLogger(AMQ3841Test.class); - private final static int maxFileLength = 1024 * 1024 * 32; - private final static String destinationName = "TEST.QUEUE"; - BrokerService broker; - - @Before - public void setUp() throws Exception { - prepareBrokerWithMultiStore(true); - broker.start(); - broker.waitUntilStarted(); - } - - @After - public void tearDown() throws Exception { - broker.stop(); - } - - protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception { - BrokerService broker = new BrokerService(); - broker.setUseJmx(true); - broker.setBrokerName("localhost"); - broker.setPersistenceAdapter(kaha); - return broker; - } - - @Test - public void testRestartAfterQueueDelete() throws Exception { - - // Ensure we have an Admin View. - assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return (broker.getAdminView()) != null; - } - })); - - broker.getAdminView().addQueue(destinationName); - - assertNotNull(broker.getDestination(new ActiveMQQueue(destinationName))); - - broker.getAdminView().removeQueue(destinationName); - - broker.stop(); - broker.waitUntilStopped(); - - prepareBrokerWithMultiStore(false); - broker.start(); - - broker.getAdminView().addQueue(destinationName); - assertNotNull(broker.getDestination(new ActiveMQQueue(destinationName))); - - } - - protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException { - KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); - kaha.setJournalMaxFileLength(maxFileLength); - kaha.setCleanupInterval(5000); - if (delete) { - kaha.deleteAllMessages(); - } - return kaha; - } - - public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception { - - MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter(); - if (deleteAllMessages) { - multiKahaDBPersistenceAdapter.deleteAllMessages(); - } - ArrayList adapters = new ArrayList<>(); - - FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter(); - template.setPersistenceAdapter(createStore(deleteAllMessages)); - template.setPerDestination(true); - adapters.add(template); - - multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters); - broker = createBroker(multiKahaDBPersistenceAdapter); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a3dfb84/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3879Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3879Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3879Test.java deleted file mode 100644 index 071897c..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3879Test.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertNotNull; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.advisory.AdvisorySupport; -import org.apache.activemq.broker.BrokerService; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ3879Test { - - static final Logger LOG = LoggerFactory.getLogger(AMQ3841Test.class); - private BrokerService broker; - - private ActiveMQConnectionFactory factory; - - @Before - public void setUp() throws Exception { - broker = createBroker(); - broker.start(); - broker.waitUntilStarted(); - - factory = new ActiveMQConnectionFactory("vm://localhost"); - factory.setAlwaysSyncSend(true); - } - - @After - public void tearDown() throws Exception { - broker.stop(); - broker.waitUntilStopped(); - broker = null; - } - - protected BrokerService createBroker() throws Exception { - BrokerService broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(true); - broker.setPersistent(false); - broker.setUseJmx(false); - broker.setBrokerName("localhost"); - broker.addConnector("vm://localhost"); - return broker; - } - - @Test - public void testConnectionDletesWrongTempDests() throws Exception { - - final Connection connection1 = factory.createConnection(); - final Connection connection2 = factory.createConnection(); - - Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); - Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Destination tempDestAdvisory = AdvisorySupport.TEMP_QUEUE_ADVISORY_TOPIC; - - MessageConsumer advisoryConsumer = session1.createConsumer(tempDestAdvisory); - connection1.start(); - - Destination tempQueue = session2.createTemporaryQueue(); - MessageProducer tempProducer = session2.createProducer(tempQueue); - - assertNotNull(advisoryConsumer.receive(5000)); - - Thread t = new Thread(new Runnable() { - - @Override - public void run() { - try { - Thread.sleep(20); - connection1.close(); - } - catch (Exception e) { - } - } - }); - - t.start(); - - for (int i = 0; i < 256; ++i) { - Message msg = session2.createTextMessage("Temp Data"); - tempProducer.send(msg); - Thread.sleep(2); - } - - t.join(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a3dfb84/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java deleted file mode 100644 index c7b4bdb..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertNotNull; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.ResourceAllocationException; -import javax.jms.Session; -import javax.jms.TemporaryQueue; -import javax.jms.Topic; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.advisory.AdvisorySupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQDestination; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ3903Test { - - private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3903Test.class); - - private static final String bindAddress = "tcp://0.0.0.0:0"; - private BrokerService broker; - private ActiveMQConnectionFactory cf; - - private static final int MESSAGE_COUNT = 100; - - @Before - public void setUp() throws Exception { - broker = this.createBroker(); - String address = broker.getTransportConnectors().get(0).getPublishableConnectString(); - broker.start(); - broker.waitUntilStarted(); - - cf = new ActiveMQConnectionFactory(address); - } - - @After - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - } - - @Test - public void testAdvisoryForFastGenericProducer() throws Exception { - doTestAdvisoryForFastProducer(true); - } - - @Test - public void testAdvisoryForFastDedicatedProducer() throws Exception { - doTestAdvisoryForFastProducer(false); - } - - public void doTestAdvisoryForFastProducer(boolean genericProducer) throws Exception { - - Connection connection = cf.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - final TemporaryQueue queue = session.createTemporaryQueue(); - - final Topic advisoryTopic = AdvisorySupport.getFastProducerAdvisoryTopic((ActiveMQDestination) queue); - final Topic advisoryWhenFullTopic = AdvisorySupport.getFullAdvisoryTopic((ActiveMQDestination) queue); - - MessageConsumer advisoryConsumer = session.createConsumer(advisoryTopic); - MessageConsumer advisoryWhenFullConsumer = session.createConsumer(advisoryWhenFullTopic); - - MessageProducer producer = session.createProducer(genericProducer ? null : queue); - - try { - // send lots of messages to the tempQueue - for (int i = 0; i < MESSAGE_COUNT; i++) { - BytesMessage m = session.createBytesMessage(); - m.writeBytes(new byte[1024]); - if (genericProducer) { - producer.send(queue, m, DeliveryMode.PERSISTENT, 4, 0); - } - else { - producer.send(m); - } - } - } - catch (ResourceAllocationException expectedOnLimitReachedAfterFastAdvisory) { - } - - // check one advisory message has produced on the advisoryTopic - Message advCmsg = advisoryConsumer.receive(4000); - assertNotNull(advCmsg); - - advCmsg = advisoryWhenFullConsumer.receive(4000); - assertNotNull(advCmsg); - - connection.close(); - LOG.debug("Connection closed, destinations should now become inactive."); - } - - protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); - answer.setPersistent(false); - answer.setUseJmx(false); - - PolicyEntry entry = new PolicyEntry(); - entry.setAdvisoryForFastProducers(true); - entry.setAdvisoryWhenFull(true); - entry.setMemoryLimit(10000); - PolicyMap map = new PolicyMap(); - map.setDefaultEntry(entry); - - answer.setDestinationPolicy(map); - answer.addConnector(bindAddress); - - answer.getSystemUsage().setSendFailIfNoSpace(true); - - return answer; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a3dfb84/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3932Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3932Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3932Test.java deleted file mode 100644 index f29ad94..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3932Test.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ3932Test { - - static final Logger LOG = LoggerFactory.getLogger(AMQ3932Test.class); - private Connection connection; - private BrokerService broker; - - @Before - public void setUp() throws Exception { - broker = new BrokerService(); - broker.setPersistent(false); - broker.setUseJmx(false); - TransportConnector tcpConnector = broker.addConnector("tcp://localhost:0"); - broker.start(); - - ConnectionFactory factory = new ActiveMQConnectionFactory("failover:(" + tcpConnector.getPublishableConnectString() + ")?jms.prefetchPolicy.queuePrefetch=0"); - connection = factory.createConnection(); - connection.start(); - } - - @After - public void tearDown() throws Exception { - connection.close(); - - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - broker = null; - } - } - - @Test - public void testPlainReceiveBlocks() throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = session.createConsumer(session.createQueue(getClass().getName())); - - broker.stop(); - broker.waitUntilStopped(); - broker = null; - - final CountDownLatch done = new CountDownLatch(1); - final CountDownLatch started = new CountDownLatch(1); - ExecutorService executor = Executors.newSingleThreadExecutor(); - - executor.execute(new Runnable() { - @Override - public void run() { - try { - started.countDown(); - LOG.info("Entering into a Sync receive call"); - consumer.receive(); - } - catch (JMSException e) { - } - done.countDown(); - } - }); - - assertTrue(started.await(10, TimeUnit.SECONDS)); - assertFalse(done.await(20, TimeUnit.SECONDS)); - } - - @Test - public void testHungReceiveNoWait() throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = session.createConsumer(session.createQueue(getClass().getName())); - - broker.stop(); - broker.waitUntilStopped(); - broker = null; - - final CountDownLatch done = new CountDownLatch(1); - final CountDownLatch started = new CountDownLatch(1); - ExecutorService executor = Executors.newSingleThreadExecutor(); - - executor.execute(new Runnable() { - @Override - public void run() { - try { - started.countDown(); - LOG.info("Entering into a Sync receiveNoWait call"); - consumer.receiveNoWait(); - } - catch (JMSException e) { - } - done.countDown(); - } - }); - - assertTrue(started.await(10, TimeUnit.SECONDS)); - assertTrue(done.await(20, TimeUnit.SECONDS)); - } - - @Test - public void testHungReceiveTimed() throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = session.createConsumer(session.createQueue(getClass().getName())); - - broker.stop(); - broker.waitUntilStopped(); - broker = null; - - final CountDownLatch done = new CountDownLatch(1); - final CountDownLatch started = new CountDownLatch(1); - ExecutorService executor = Executors.newSingleThreadExecutor(); - - executor.execute(new Runnable() { - @Override - public void run() { - try { - started.countDown(); - LOG.info("Entering into a timed Sync receive call"); - consumer.receive(10); - } - catch (JMSException e) { - } - done.countDown(); - } - }); - - assertTrue(started.await(10, TimeUnit.SECONDS)); - assertTrue(done.await(20, TimeUnit.SECONDS)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a3dfb84/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java deleted file mode 100644 index 3287085..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import javax.management.openmbean.CompositeData; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.QueueViewMBean; -import org.apache.activemq.command.ActiveMQQueue; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - -public class AMQ3934Test { - - private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3934Test.class); - private static BrokerService brokerService; - private static String TEST_QUEUE = "testQueue"; - private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE); - private static String BROKER_ADDRESS = "tcp://localhost:0"; - - private ActiveMQConnectionFactory connectionFactory; - private String connectionUri; - private String messageID; - - @Before - public void setUp() throws Exception { - brokerService = new BrokerService(); - brokerService.setPersistent(false); - brokerService.setUseJmx(true); - connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); - brokerService.start(); - brokerService.waitUntilStarted(); - - connectionFactory = new ActiveMQConnectionFactory(connectionUri); - sendMessage(); - } - - public void sendMessage() throws Exception { - final Connection conn = connectionFactory.createConnection(); - try { - conn.start(); - final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Destination queue = session.createQueue(TEST_QUEUE); - final Message toSend = session.createMessage(); - final MessageProducer producer = session.createProducer(queue); - producer.send(queue, toSend); - } - finally { - conn.close(); - } - } - - @After - public void tearDown() throws Exception { - brokerService.stop(); - brokerService.waitUntilStopped(); - } - - @Test - public void getMessage() throws Exception { - final QueueViewMBean queueView = getProxyToQueueViewMBean(); - final CompositeData messages[] = queueView.browse(); - messageID = (String) messages[0].get("JMSMessageID"); - assertNotNull(messageID); - assertNotNull(queueView.getMessage(messageID)); - LOG.debug("Attempting to remove message ID: " + messageID); - queueView.removeMessage(messageID); - assertNull(queueView.getMessage(messageID)); - } - - private QueueViewMBean getProxyToQueueViewMBean() throws MalformedObjectNameException, NullPointerException, JMSException { - final ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName()); - final QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); - return proxy; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a3dfb84/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3961Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3961Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3961Test.java deleted file mode 100644 index c39cabf..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3961Test.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; - -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; - -import javax.jms.ConnectionConsumer; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.ServerSession; -import javax.jms.ServerSessionPool; -import javax.jms.Session; -import javax.jms.TopicConnection; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQTopic; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ3961Test { - - private static BrokerService brokerService; - private static String BROKER_ADDRESS = "tcp://localhost:0"; - - private ActiveMQConnectionFactory connectionFactory; - private String connectionUri; - - @Before - public void setUp() throws Exception { - brokerService = new BrokerService(); - brokerService.setPersistent(false); - brokerService.setUseJmx(true); - brokerService.setDeleteAllMessagesOnStartup(true); - connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); - brokerService.start(); - brokerService.waitUntilStarted(); - - connectionFactory = new ActiveMQConnectionFactory(connectionUri); - } - - @After - public void tearDown() throws Exception { - brokerService.stop(); - brokerService.waitUntilStopped(); - } - - public class TestServerSessionPool implements ServerSessionPool { - - private final TopicConnection connection; - - public TestServerSessionPool(final TopicConnection connection) { - this.connection = connection; - } - - @Override - public ServerSession getServerSession() throws JMSException { - final TopicSession topicSession = connection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); - return new TestServerSession(topicSession); - } - } - - public class TestServerSession implements ServerSession, MessageListener { - - private final TopicSession session; - - public TestServerSession(final TopicSession session) throws JMSException { - this.session = session; - session.setMessageListener(this); - } - - @Override - public Session getSession() throws JMSException { - return session; - } - - @Override - public void start() throws JMSException { - session.run(); - } - - @Override - public void onMessage(final Message message) { - synchronized (processedSessions) { - processedSessions.add(this); - } - } - } - - public static final int MESSAGE_COUNT = 16; - private final List processedSessions = new LinkedList<>(); - private final List committedSessions = new LinkedList<>(); - - @Test - public void testPrefetchInDurableSubscription() throws Exception { - final ActiveMQTopic topic = new ActiveMQTopic("TestTopic"); - - final TopicConnection initialSubConnection = connectionFactory.createTopicConnection(); - initialSubConnection.setClientID("TestClient"); - initialSubConnection.start(); - final TopicSession initialSubSession = initialSubConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); - final TopicSubscriber initialSubscriber = initialSubSession.createDurableSubscriber(topic, "TestSubscriber"); - - initialSubscriber.close(); - initialSubSession.close(); - initialSubConnection.close(); - - final TopicConnection publisherConnection = connectionFactory.createTopicConnection(); - publisherConnection.start(); - final TopicSession publisherSession = publisherConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - final TopicPublisher publisher = publisherSession.createPublisher(topic); - for (int i = 1; i <= MESSAGE_COUNT; i++) { - final Message msg = publisherSession.createTextMessage("Message #" + i); - publisher.publish(msg); - } - publisher.close(); - publisherSession.close(); - publisherConnection.close(); - - final TopicConnection connection = connectionFactory.createTopicConnection(); - connection.setClientID("TestClient"); - connection.start(); - final TestServerSessionPool pool = new TestServerSessionPool(connection); - final ConnectionConsumer connectionConsumer = connection.createDurableConnectionConsumer(topic, "TestSubscriber", null, pool, 1); - while (true) { - int lastMsgCount = 0; - int msgCount = 0; - do { - lastMsgCount = msgCount; - Thread.sleep(200L); - synchronized (processedSessions) { - msgCount = processedSessions.size(); - } - } while (lastMsgCount < msgCount); - - if (lastMsgCount == 0) { - break; - } - - final LinkedList collected; - synchronized (processedSessions) { - collected = new LinkedList<>(processedSessions); - processedSessions.clear(); - } - - final Iterator sessions = collected.iterator(); - while (sessions.hasNext()) { - final TestServerSession session = sessions.next(); - committedSessions.add(session); - session.getSession().commit(); - session.getSession().close(); - } - } - - connectionConsumer.close(); - final TopicSession finalSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - finalSession.unsubscribe("TestSubscriber"); - finalSession.close(); - connection.close(); - assertEquals(MESSAGE_COUNT, committedSessions.size()); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a3dfb84/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3992Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3992Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3992Test.java deleted file mode 100644 index 4fe8ba1..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3992Test.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.jms.Topic; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.BrokerView; -import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ3992Test { - - private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3992Test.class); - private static BrokerService brokerService; - private static String BROKER_ADDRESS = "tcp://localhost:0"; - - private String connectionUri; - - @Before - public void setUp() throws Exception { - brokerService = new BrokerService(); - brokerService.setPersistent(false); - brokerService.setUseJmx(true); - brokerService.setDeleteAllMessagesOnStartup(true); - connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); - brokerService.start(); - brokerService.waitUntilStarted(); - } - - @After - public void tearDown() throws Exception { - brokerService.stop(); - brokerService.waitUntilStopped(); - } - - @Test - public void testDurableConsumerEnqueueCountWithZeroPrefetch() throws Exception { - - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); - connectionFactory.getPrefetchPolicy().setAll(0); - - Connection connection = connectionFactory.createConnection(); - connection.setClientID(getClass().getName()); - connection.start(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createTopic("DurableTopic"); - - MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "EnqueueSub"); - - BrokerView view = brokerService.getAdminView(); - view.getDurableTopicSubscribers(); - - ObjectName subName = view.getDurableTopicSubscribers()[0]; - - DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean) brokerService.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true); - - assertEquals(0, sub.getEnqueueCounter()); - - LOG.info("Enqueue counter for sub before pull requests: " + sub.getEnqueueCounter()); - - // Trigger some pull Timeouts. - consumer.receive(500); - consumer.receive(500); - consumer.receive(500); - consumer.receive(500); - consumer.receive(500); - - // Let them all timeout. - Thread.sleep(600); - - LOG.info("Enqueue counter for sub after pull requests: " + sub.getEnqueueCounter()); - assertEquals(0, sub.getEnqueueCounter()); - - consumer.close(); - session.close(); - connection.close(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a3dfb84/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java deleted file mode 100644 index 8272aef..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java +++ /dev/null @@ -1,280 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; - -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Field; -import java.util.Iterator; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQSession; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.DurableTopicSubscription; -import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.broker.region.Subscription; -import org.apache.activemq.broker.region.TopicRegion; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.util.SubscriptionKey; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ4062Test { - - private BrokerService service; - private PolicyEntry policy; - private ConcurrentMap durableSubscriptions; - - private static final int PREFETCH_SIZE_5 = 5; - private String connectionUri; - - @Before - public void startBroker() throws IOException, Exception { - service = new BrokerService(); - service.setPersistent(true); - service.setDeleteAllMessagesOnStartup(true); - service.setUseJmx(false); - - KahaDBPersistenceAdapter pa = new KahaDBPersistenceAdapter(); - File dataFile = new File("createData"); - pa.setDirectory(dataFile); - pa.setJournalMaxFileLength(1024 * 1024 * 32); - - service.setPersistenceAdapter(pa); - - policy = new PolicyEntry(); - policy.setTopic(">"); - policy.setDurableTopicPrefetch(PREFETCH_SIZE_5); - PolicyMap pMap = new PolicyMap(); - pMap.setDefaultEntry(policy); - - service.setDestinationPolicy(pMap); - - service.addConnector("tcp://localhost:0"); - - service.start(); - service.waitUntilStarted(); - - connectionUri = service.getTransportConnectors().get(0).getPublishableConnectString(); - } - - public void restartBroker() throws IOException, Exception { - service = new BrokerService(); - service.setPersistent(true); - service.setUseJmx(false); - service.setKeepDurableSubsActive(false); - - KahaDBPersistenceAdapter pa = new KahaDBPersistenceAdapter(); - File dataFile = new File("createData"); - pa.setDirectory(dataFile); - pa.setJournalMaxFileLength(1024 * 1024 * 32); - - service.setPersistenceAdapter(pa); - - policy = new PolicyEntry(); - policy.setTopic(">"); - policy.setDurableTopicPrefetch(PREFETCH_SIZE_5); - PolicyMap pMap = new PolicyMap(); - pMap.setDefaultEntry(policy); - - service.setDestinationPolicy(pMap); - service.addConnector("tcp://localhost:0"); - service.start(); - service.waitUntilStarted(); - - connectionUri = service.getTransportConnectors().get(0).getPublishableConnectString(); - } - - @After - public void stopBroker() throws Exception { - service.stop(); - service.waitUntilStopped(); - service = null; - } - - @Test - public void testDirableSubPrefetchRecovered() throws Exception { - - PrefetchConsumer consumer = new PrefetchConsumer(true, connectionUri); - consumer.receive(); - durableSubscriptions = getDurableSubscriptions(); - ConsumerInfo info = getConsumerInfo(durableSubscriptions); - - //check if the prefetchSize equals to the size we set in the PolicyEntry - assertEquals(PREFETCH_SIZE_5, info.getPrefetchSize()); - - consumer.a.countDown(); - Producer p = new Producer(connectionUri); - p.send(); - p = null; - - service.stop(); - service.waitUntilStopped(); - durableSubscriptions = null; - - consumer = null; - stopBroker(); - - restartBroker(); - - getDurableSubscriptions(); - info = null; - info = getConsumerInfo(durableSubscriptions); - - //check if the prefetchSize equals to 0 after persistent storage recovered - //assertEquals(0, info.getPrefetchSize()); - - consumer = new PrefetchConsumer(false, connectionUri); - consumer.receive(); - consumer.a.countDown(); - - info = null; - info = getConsumerInfo(durableSubscriptions); - - //check if the prefetchSize is the default size for durable consumer and the PolicyEntry - //we set earlier take no effect - //assertEquals(100, info.getPrefetchSize()); - //info.getPrefetchSize() is 100,it should be 5,because I set the PolicyEntry as follows, - //policy.setDurableTopicPrefetch(PREFETCH_SIZE_5); - assertEquals(5, info.getPrefetchSize()); - } - - @SuppressWarnings("unchecked") - private ConcurrentMap getDurableSubscriptions() throws NoSuchFieldException, IllegalAccessException { - if (durableSubscriptions != null) - return durableSubscriptions; - RegionBroker regionBroker = (RegionBroker) service.getRegionBroker(); - TopicRegion region = (TopicRegion) regionBroker.getTopicRegion(); - Field field = TopicRegion.class.getDeclaredField("durableSubscriptions"); - field.setAccessible(true); - durableSubscriptions = (ConcurrentMap) field.get(region); - return durableSubscriptions; - } - - private ConsumerInfo getConsumerInfo(ConcurrentMap durableSubscriptions) { - ConsumerInfo info = null; - for (Iterator it = durableSubscriptions.values().iterator(); it.hasNext(); ) { - Subscription sub = it.next(); - info = sub.getConsumerInfo(); - if (info.getSubscriptionName().equals(PrefetchConsumer.SUBSCRIPTION_NAME)) { - return info; - } - } - return null; - } - - public class PrefetchConsumer implements MessageListener { - - public static final String SUBSCRIPTION_NAME = "A_NAME_ABC_DEF"; - private final String user = ActiveMQConnection.DEFAULT_USER; - private final String password = ActiveMQConnection.DEFAULT_PASSWORD; - private final String uri; - private boolean transacted; - ActiveMQConnection connection; - Session session; - MessageConsumer consumer; - private boolean needAck = false; - CountDownLatch a = new CountDownLatch(1); - - public PrefetchConsumer(boolean needAck, String uri) { - this.needAck = needAck; - this.uri = uri; - } - - public void receive() throws Exception { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, uri); - connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.setClientID("3"); - connection.start(); - - session = connection.createSession(transacted, Session.CLIENT_ACKNOWLEDGE); - Destination destination = session.createTopic("topic2"); - consumer = session.createDurableSubscriber((Topic) destination, SUBSCRIPTION_NAME); - consumer.setMessageListener(this); - } - - @Override - public void onMessage(Message message) { - try { - a.await(); - } - catch (InterruptedException e1) { - } - if (needAck) { - try { - message.acknowledge(); - consumer.close(); - session.close(); - connection.close(); - } - catch (JMSException e) { - } - } - } - } - - public class Producer { - - protected final String user = ActiveMQConnection.DEFAULT_USER; - - private final String password = ActiveMQConnection.DEFAULT_PASSWORD; - private final String uri; - private boolean transacted; - - public Producer(String uri) { - this.uri = uri; - } - - public void send() throws Exception { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, uri); - ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.start(); - - ActiveMQSession session = (ActiveMQSession) connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createTopic("topic2"); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - for (int i = 0; i < 100; i++) { - TextMessage om = session.createTextMessage("hello from producer"); - producer.send(om); - } - producer.close(); - session.close(); - connection.close(); - } - } -}