Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6D21A19A7A for ; Fri, 18 Mar 2016 01:41:52 +0000 (UTC) Received: (qmail 31943 invoked by uid 500); 18 Mar 2016 01:41:52 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 31793 invoked by uid 500); 18 Mar 2016 01:41:52 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 30305 invoked by uid 99); 18 Mar 2016 01:41:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Mar 2016 01:41:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E8052E78B8; Fri, 18 Mar 2016 01:41:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Fri, 18 Mar 2016 01:42:10 -0000 Message-Id: <76257b1bd9de4a2896b7ff23dd674fe1@git.apache.org> In-Reply-To: <5f9c254d77c94d93a974dbbe3330320c@git.apache.org> References: <5f9c254d77c94d93a974dbbe3330320c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [21/65] [abbrv] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java deleted file mode 100644 index 74c19b7..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java +++ /dev/null @@ -1,763 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertTrue; - -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Enumeration; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.network.DiscoveryNetworkConnector; -import org.apache.activemq.network.NetworkConnector; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ3274Test { - - private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3274Test.class); - - protected static int Next_broker_num = 0; - protected EmbeddedTcpBroker broker1; - protected EmbeddedTcpBroker broker2; - - protected int nextEchoId = 0; - protected boolean testError = false; - - protected int echoResponseFill = 0; // Number of "filler" response messages per request - - public AMQ3274Test() throws Exception { - broker1 = new EmbeddedTcpBroker(); - broker2 = new EmbeddedTcpBroker(); - - broker1.coreConnectTo(broker2, true); - broker2.coreConnectTo(broker1, true); - } - - public void logMessage(String msg) { - System.out.println(msg); - System.out.flush(); - } - - public void testMessages(Session sess, - MessageProducer req_prod, - Destination resp_dest, - int num_msg) throws Exception { - MessageConsumer resp_cons; - TextMessage msg; - MessageClient cons_client; - int cur; - int tot_expected; - - resp_cons = sess.createConsumer(resp_dest); - - cons_client = new MessageClient(resp_cons, num_msg); - cons_client.start(); - - cur = 0; - while ((cur < num_msg) && (!testError)) { - msg = sess.createTextMessage("MSG AAAA " + cur); - msg.setIntProperty("SEQ", 100 + cur); - msg.setStringProperty("TEST", "TOPO"); - msg.setJMSReplyTo(resp_dest); - - if (cur == (num_msg - 1)) - msg.setBooleanProperty("end-of-response", true); - - req_prod.send(msg); - - cur++; - } - - cons_client.waitShutdown(5000); - - if (cons_client.shutdown()) { - LOG.debug("Consumer client shutdown complete"); - } - else { - LOG.debug("Consumer client shutdown incomplete!!!"); - } - - tot_expected = num_msg * (echoResponseFill + 1); - - if (cons_client.getNumMsgReceived() == tot_expected) { - LOG.info("Have " + tot_expected + " messages, as-expected"); - } - else { - LOG.error("Have " + cons_client.getNumMsgReceived() + " messages; expected " + tot_expected); - testError = true; - } - - resp_cons.close(); - } - - /** - * Test one destination between the given "producer broker" and - * "consumer broker" specified. - */ - public void testOneDest(Connection conn, - Session sess, - Destination cons_dest, - String prod_broker_url, - String cons_broker_url, - int num_msg) throws Exception { - int echo_id; - - EchoService echo_svc; - String echo_queue_name; - Destination prod_dest; - MessageProducer msg_prod; - - synchronized (this) { - echo_id = this.nextEchoId; - this.nextEchoId++; - } - - echo_queue_name = "echo.queue." + echo_id; - - LOG.trace("destroying the echo queue in case an old one exists"); - removeQueue(conn, echo_queue_name); - - echo_svc = new EchoService(echo_queue_name, prod_broker_url); - echo_svc.start(); - - LOG.trace("Creating echo queue and producer"); - prod_dest = sess.createQueue(echo_queue_name); - msg_prod = sess.createProducer(prod_dest); - - testMessages(sess, msg_prod, cons_dest, num_msg); - - echo_svc.shutdown(); - msg_prod.close(); - } - - /** - * TEST TEMPORARY TOPICS - */ - public void testTempTopic(String prod_broker_url, String cons_broker_url) throws Exception { - Connection conn; - Session sess; - Destination cons_dest; - int num_msg; - - num_msg = 5; - - LOG.info("TESTING TEMP TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)"); - - conn = createConnection(cons_broker_url); - conn.start(); - sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - LOG.trace("Creating destination"); - cons_dest = sess.createTemporaryTopic(); - - testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg); - - sess.close(); - conn.close(); - } - - /** - * TEST TOPICS - */ - public void testTopic(String prod_broker_url, String cons_broker_url) throws Exception { - int num_msg; - - Connection conn; - Session sess; - String topic_name; - - Destination cons_dest; - - num_msg = 5; - - LOG.info("TESTING TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)"); - - conn = createConnection(cons_broker_url); - conn.start(); - sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - topic_name = "topotest2.perm.topic"; - LOG.trace("Removing existing Topic"); - removeTopic(conn, topic_name); - LOG.trace("Creating Topic, " + topic_name); - cons_dest = sess.createTopic(topic_name); - - testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg); - - removeTopic(conn, topic_name); - sess.close(); - conn.close(); - } - - /** - * TEST TEMPORARY QUEUES - */ - public void testTempQueue(String prod_broker_url, String cons_broker_url) throws Exception { - int num_msg; - - Connection conn; - Session sess; - - Destination cons_dest; - - num_msg = 5; - - LOG.info("TESTING TEMP QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)"); - - conn = createConnection(cons_broker_url); - conn.start(); - sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - LOG.trace("Creating destination"); - cons_dest = sess.createTemporaryQueue(); - - testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg); - - sess.close(); - conn.close(); - } - - /** - * TEST QUEUES - */ - public void testQueue(String prod_broker_url, String cons_broker_url) throws Exception { - int num_msg; - - Connection conn; - Session sess; - String queue_name; - - Destination cons_dest; - - num_msg = 5; - - LOG.info("TESTING QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)"); - - conn = createConnection(cons_broker_url); - conn.start(); - sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - queue_name = "topotest2.perm.queue"; - LOG.trace("Removing existing Queue"); - removeQueue(conn, queue_name); - LOG.trace("Creating Queue, " + queue_name); - cons_dest = sess.createQueue(queue_name); - - testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg); - - removeQueue(conn, queue_name); - sess.close(); - conn.close(); - } - - @Test - public void run() throws Exception { - Thread start1; - Thread start2; - - testError = false; - - // Use threads to avoid startup deadlock since the first broker started waits until - // it knows the name of the remote broker before finishing its startup, which means - // the remote must already be running. - - start1 = new Thread() { - @Override - public void run() { - try { - broker1.start(); - } - catch (Exception ex) { - LOG.error(null, ex); - } - } - }; - - start2 = new Thread() { - @Override - public void run() { - try { - broker2.start(); - } - catch (Exception ex) { - LOG.error(null, ex); - } - } - }; - - start1.start(); - start2.start(); - - start1.join(); - start2.join(); - - if (!testError) { - this.testTempTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl()); - } - if (!testError) { - this.testTempQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl()); - } - if (!testError) { - this.testTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl()); - } - if (!testError) { - this.testQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl()); - } - Thread.sleep(100); - - shutdown(); - - assertTrue(!testError); - } - - public void shutdown() throws Exception { - broker1.stop(); - broker2.stop(); - } - - /** - * @param args the command line arguments - */ - public static void main(String[] args) { - AMQ3274Test main_obj; - - try { - main_obj = new AMQ3274Test(); - main_obj.run(); - } - catch (Exception ex) { - ex.printStackTrace(); - LOG.error(null, ex); - System.exit(0); - } - } - - protected Connection createConnection(String url) throws Exception { - return org.apache.activemq.ActiveMQConnection.makeConnection(url); - } - - protected static void removeQueue(Connection conn, String dest_name) throws java.lang.Exception { - org.apache.activemq.command.ActiveMQDestination dest; - - if (conn instanceof org.apache.activemq.ActiveMQConnection) { - dest = org.apache.activemq.command.ActiveMQDestination.createDestination(dest_name, org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE); - ((org.apache.activemq.ActiveMQConnection) conn).destroyDestination(dest); - } - } - - protected static void removeTopic(Connection conn, String dest_name) throws java.lang.Exception { - org.apache.activemq.command.ActiveMQDestination dest; - - if (conn instanceof org.apache.activemq.ActiveMQConnection) { - dest = org.apache.activemq.command.ActiveMQDestination.createDestination(dest_name, org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE); - ((org.apache.activemq.ActiveMQConnection) conn).destroyDestination(dest); - } - } - - @SuppressWarnings("rawtypes") - public static String fmtMsgInfo(Message msg) throws Exception { - StringBuilder msg_desc; - String prop; - Enumeration prop_enum; - - msg_desc = new StringBuilder(); - msg_desc = new StringBuilder(); - - if (msg instanceof TextMessage) { - msg_desc.append(((TextMessage) msg).getText()); - } - else { - msg_desc.append("["); - msg_desc.append(msg.getClass().getName()); - msg_desc.append("]"); - } - - prop_enum = msg.getPropertyNames(); - while (prop_enum.hasMoreElements()) { - prop = (String) prop_enum.nextElement(); - msg_desc.append("; "); - msg_desc.append(prop); - msg_desc.append("="); - msg_desc.append(msg.getStringProperty(prop)); - } - - return msg_desc.toString(); - } - - // //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // /////////////////////////////////////////////// INTERNAL CLASSES - // ///////////////////////////////////////////////// - // //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - protected class EmbeddedTcpBroker { - - protected BrokerService brokerSvc; - protected int brokerNum; - protected String brokerName; - protected String brokerId; - protected int port; - protected String tcpUrl; - - public EmbeddedTcpBroker() throws Exception { - brokerSvc = new BrokerService(); - - synchronized (this.getClass()) { - brokerNum = Next_broker_num; - Next_broker_num++; - } - - brokerName = "broker" + brokerNum; - brokerId = "b" + brokerNum; - - brokerSvc.setBrokerName(brokerName); - brokerSvc.setBrokerId(brokerId); - brokerSvc.setPersistent(false); - brokerSvc.setUseJmx(false); - tcpUrl = brokerSvc.addConnector("tcp://localhost:0").getPublishableConnectString(); - } - - public Connection createConnection() throws URISyntaxException, JMSException { - Connection result; - - result = org.apache.activemq.ActiveMQConnection.makeConnection(this.tcpUrl); - - return result; - } - - public String getConnectionUrl() { - return this.tcpUrl; - } - - /** - * Create network connections to the given broker using the - * network-connector configuration of CORE brokers (e.g. - * core1.bus.dev1.coresys.tmcs) - * - * @param other - * @param duplex_f - */ - public void coreConnectTo(EmbeddedTcpBroker other, boolean duplex_f) throws Exception { - this.makeConnectionTo(other, duplex_f, true); - this.makeConnectionTo(other, duplex_f, false); - } - - public void start() throws Exception { - brokerSvc.start(); - } - - public void stop() throws Exception { - brokerSvc.stop(); - } - - /** - * Make one connection to the other embedded broker, of the specified - * type (queue or topic) using the standard CORE broker networking. - * - * @param other - * @param duplex_f - * @param queue_f - * @throws Exception - */ - protected void makeConnectionTo(EmbeddedTcpBroker other, boolean duplex_f, boolean queue_f) throws Exception { - NetworkConnector nw_conn; - String prefix; - ActiveMQDestination excl_dest; - ArrayList excludes; - - nw_conn = new DiscoveryNetworkConnector(new URI("static:(" + other.tcpUrl + ")")); - nw_conn.setDuplex(duplex_f); - - if (queue_f) - nw_conn.setConduitSubscriptions(false); - else - nw_conn.setConduitSubscriptions(true); - - nw_conn.setNetworkTTL(5); - nw_conn.setSuppressDuplicateQueueSubscriptions(true); - nw_conn.setDecreaseNetworkConsumerPriority(true); - nw_conn.setBridgeTempDestinations(true); - - if (queue_f) { - prefix = "queue"; - excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE); - } - else { - prefix = "topic"; - excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE); - } - - excludes = new ArrayList<>(); - excludes.add(excl_dest); - nw_conn.setExcludedDestinations(excludes); - - if (duplex_f) - nw_conn.setName(this.brokerId + "<-" + prefix + "->" + other.brokerId); - else - nw_conn.setName(this.brokerId + "-" + prefix + "->" + other.brokerId); - - brokerSvc.addNetworkConnector(nw_conn); - } - } - - protected class MessageClient extends java.lang.Thread { - - protected MessageConsumer msgCons; - protected boolean shutdownInd; - protected int expectedCount; - protected int lastSeq = 0; - protected int msgCount = 0; - protected boolean haveFirstSeq; - protected CountDownLatch shutdownLatch; - - public MessageClient(MessageConsumer cons, int num_to_expect) { - msgCons = cons; - expectedCount = (num_to_expect * (echoResponseFill + 1)); - shutdownLatch = new CountDownLatch(1); - } - - @Override - public void run() { - CountDownLatch latch; - - try { - synchronized (this) { - latch = shutdownLatch; - } - - shutdownInd = false; - processMessages(); - - latch.countDown(); - } - catch (Exception exc) { - LOG.error("message client error", exc); - } - } - - public void waitShutdown(long timeout) { - CountDownLatch latch; - - try { - synchronized (this) { - latch = shutdownLatch; - } - - if (latch != null) - latch.await(timeout, TimeUnit.MILLISECONDS); - else - LOG.info("echo client shutdown: client does not appear to be active"); - } - catch (InterruptedException int_exc) { - LOG.warn("wait for message client shutdown interrupted", int_exc); - } - } - - public boolean shutdown() { - boolean down_ind; - - if (!shutdownInd) { - shutdownInd = true; - } - - waitShutdown(200); - - synchronized (this) { - if ((shutdownLatch == null) || (shutdownLatch.getCount() == 0)) - down_ind = true; - else - down_ind = false; - } - - return down_ind; - } - - public int getNumMsgReceived() { - return msgCount; - } - - protected void processMessages() throws Exception { - Message in_msg; - - haveFirstSeq = false; - while ((!shutdownInd) && (!testError)) { - in_msg = msgCons.receive(100); - - if (in_msg != null) { - msgCount++; - checkMessage(in_msg); - } - } - } - - protected void checkMessage(Message in_msg) throws Exception { - int seq; - - LOG.debug("received message " + fmtMsgInfo(in_msg)); - - if (in_msg.propertyExists("SEQ")) { - seq = in_msg.getIntProperty("SEQ"); - - if ((haveFirstSeq) && (seq != (lastSeq + 1))) { - LOG.error("***ERROR*** incorrect sequence number; expected " + Integer.toString(lastSeq + 1) + " but have " + Integer.toString(seq)); - - testError = true; - } - - lastSeq = seq; - - if (msgCount > expectedCount) { - LOG.warn("*** have more messages than expected; have " + msgCount + "; expect " + expectedCount); - - testError = true; - } - } - - if (in_msg.propertyExists("end-of-response")) { - LOG.trace("received end-of-response message"); - shutdownInd = true; - } - } - } - - protected class EchoService extends java.lang.Thread { - - protected String destName; - protected Connection jmsConn; - protected Session sess; - protected MessageConsumer msg_cons; - protected boolean Shutdown_ind; - - protected Destination req_dest; - protected Destination resp_dest; - protected MessageProducer msg_prod; - - protected CountDownLatch waitShutdown; - - public EchoService(String dest, Connection broker_conn) throws Exception { - destName = dest; - jmsConn = broker_conn; - - Shutdown_ind = false; - - sess = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE); - req_dest = sess.createQueue(destName); - msg_cons = sess.createConsumer(req_dest); - - jmsConn.start(); - - waitShutdown = new CountDownLatch(1); - } - - public EchoService(String dest, String broker_url) throws Exception { - this(dest, ActiveMQConnection.makeConnection(broker_url)); - } - - @Override - public void run() { - Message req; - - try { - LOG.info("STARTING ECHO SERVICE"); - - while (!Shutdown_ind) { - req = msg_cons.receive(100); - if (req != null) { - if (LOG.isDebugEnabled()) - LOG.debug("ECHO request message " + req.toString()); - - resp_dest = req.getJMSReplyTo(); - if (resp_dest != null) { - msg_prod = sess.createProducer(resp_dest); - msg_prod.send(req); - msg_prod.close(); - msg_prod = null; - } - else { - LOG.warn("invalid request: no reply-to destination given"); - } - } - } - } - catch (Exception ex) { - LOG.error(null, ex); - } - finally { - LOG.info("shutting down test echo service"); - - try { - jmsConn.stop(); - } - catch (javax.jms.JMSException jms_exc) { - LOG.warn("error on shutting down JMS connection", jms_exc); - } - - synchronized (this) { - waitShutdown.countDown(); - } - } - } - - /** - * Shut down the service, waiting up to 3 seconds for the service to - * terminate. - */ - public void shutdown() { - CountDownLatch wait_l; - - synchronized (this) { - wait_l = waitShutdown; - } - - Shutdown_ind = true; - - try { - if (wait_l != null) { - if (wait_l.await(3000, TimeUnit.MILLISECONDS)) { - LOG.info("echo service shutdown complete"); - } - else { - LOG.warn("timeout waiting for echo service shutdown"); - } - } - else { - LOG.info("echo service shutdown: service does not appear to be active"); - } - } - catch (InterruptedException int_exc) { - LOG.warn("interrupted while waiting for echo service shutdown"); - } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java deleted file mode 100644 index a90521b..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TemporaryQueue; -import javax.jms.Topic; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.advisory.AdvisorySupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.DestinationInterceptor; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.virtual.MirroredQueue; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ3324Test { - - private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3324Test.class); - - private static final String bindAddress = "tcp://0.0.0.0:0"; - private BrokerService broker; - private ActiveMQConnectionFactory cf; - - private static final int MESSAGE_COUNT = 100; - - @Before - public void setUp() throws Exception { - broker = this.createBroker(); - String address = broker.getTransportConnectors().get(0).getPublishableConnectString(); - broker.start(); - broker.waitUntilStarted(); - - cf = new ActiveMQConnectionFactory(address); - } - - @After - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - } - - @Test - public void testTempMessageConsumedAdvisoryConnectionClose() throws Exception { - - Connection connection = cf.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - final TemporaryQueue queue = session.createTemporaryQueue(); - MessageConsumer consumer = session.createConsumer(queue); - - final Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue); - - MessageConsumer advisoryConsumer = session.createConsumer(advisoryTopic); - MessageProducer producer = session.createProducer(queue); - - // send lots of messages to the tempQueue - for (int i = 0; i < MESSAGE_COUNT; i++) { - BytesMessage m = session.createBytesMessage(); - m.writeBytes(new byte[1024]); - producer.send(m); - } - - // consume one message from tempQueue - Message msg = consumer.receive(5000); - assertNotNull(msg); - - // check one advisory message has produced on the advisoryTopic - Message advCmsg = advisoryConsumer.receive(5000); - assertNotNull(advCmsg); - - connection.close(); - LOG.debug("Connection closed, destinations should now become inactive."); - - assertTrue("The destination " + advisoryTopic + "was not removed. ", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return broker.getAdminView().getTopics().length == 0; - } - })); - - assertTrue("The destination " + queue + " was not removed. ", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return broker.getAdminView().getTemporaryQueues().length == 0; - } - })); - } - - protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); - answer.setUseMirroredQueues(true); - answer.setPersistent(false); - answer.setSchedulePeriodForDestinationPurge(1000); - - PolicyEntry entry = new PolicyEntry(); - entry.setGcInactiveDestinations(true); - entry.setInactiveTimeoutBeforeGC(2000); - entry.setProducerFlowControl(true); - entry.setAdvisoryForConsumed(true); - entry.setAdvisoryForFastProducers(true); - entry.setAdvisoryForDelivery(true); - PolicyMap map = new PolicyMap(); - map.setDefaultEntry(entry); - - MirroredQueue mirrorQ = new MirroredQueue(); - mirrorQ.setCopyMessage(true); - DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[]{mirrorQ}; - answer.setDestinationInterceptors(destinationInterceptors); - - answer.setDestinationPolicy(map); - answer.addConnector(bindAddress); - - return answer; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3352Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3352Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3352Test.java deleted file mode 100644 index 6f27bdd..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3352Test.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.DeliveryMode; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ3352Test { - - TransportConnector connector; - BrokerService brokerService; - - @Before - public void startBroker() throws Exception { - brokerService = new BrokerService(); - brokerService.setDeleteAllMessagesOnStartup(true); - connector = brokerService.addConnector("tcp://0.0.0.0:0"); - brokerService.start(); - } - - @After - public void stopBroker() throws Exception { - brokerService.stop(); - } - - @Test - public void verifyEnqueueLargeNumWithStateTracker() throws Exception { - String url = "failover:(" + connector.getPublishableConnectString() + ")?jms.useAsyncSend=true&trackMessages=true&maxCacheSize=131072"; - - ActiveMQConnection conn = (ActiveMQConnection) new ActiveMQConnectionFactory(url).createConnection(null, null); - - Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - MessageProducer producer = session.createProducer(session.createQueue("EVENTQ")); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - producer.setDisableMessageID(true); - producer.setDisableMessageTimestamp(true); - - StringBuffer buffer = new StringBuffer(); - for (int i = 0; i < 1024; i++) { - buffer.append(String.valueOf(Math.random())); - } - String payload = buffer.toString(); - - for (int i = 0; i < 10000; i++) { - StringBuffer buff = new StringBuffer("x"); - buff.append(payload); - producer.send(session.createTextMessage(buff.toString())); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java deleted file mode 100644 index 5a58dd3..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java +++ /dev/null @@ -1,281 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.RedeliveryPolicy; -import org.apache.activemq.TestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.QueueViewMBean; -import org.apache.activemq.broker.region.policy.DeadLetterStrategy; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.util.Wait; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ3405Test extends TestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ3405Test.class); - - private Connection connection; - private Session session; - private MessageConsumer consumer; - private MessageProducer producer; - private int deliveryMode = DeliveryMode.PERSISTENT; - private Destination dlqDestination; - private MessageConsumer dlqConsumer; - private BrokerService broker; - - private int messageCount; - private Destination destination; - private int rollbackCount; - private Session dlqSession; - private final Error[] error = new Error[1]; - private boolean topic = true; - private boolean durableSubscriber = true; - - public void testTransientTopicMessage() throws Exception { - topic = true; - deliveryMode = DeliveryMode.NON_PERSISTENT; - durableSubscriber = true; - doTest(); - } - - protected BrokerService createBroker() throws Exception { - BrokerService broker = new BrokerService(); - broker.setPersistent(false); - PolicyEntry policy = new PolicyEntry(); - DeadLetterStrategy defaultDeadLetterStrategy = policy.getDeadLetterStrategy(); - if (defaultDeadLetterStrategy != null) { - defaultDeadLetterStrategy.setProcessNonPersistent(true); - } - PolicyMap pMap = new PolicyMap(); - pMap.setDefaultEntry(policy); - broker.setDestinationPolicy(pMap); - return broker; - } - - protected void doTest() throws Exception { - messageCount = 200; - connection.start(); - - final QueueViewMBean dlqView = getProxyToDLQ(); - - ActiveMQConnection amqConnection = (ActiveMQConnection) connection; - rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1; - LOG.info("Will redeliver messages: " + rollbackCount + " times"); - - makeConsumer(); - makeDlqConsumer(); - dlqConsumer.close(); - - sendMessages(); - - // now lets receive and rollback N times - int maxRollbacks = messageCount * rollbackCount; - - consumer.setMessageListener(new RollbackMessageListener(maxRollbacks, rollbackCount)); - - // We receive and rollback into the DLQ N times moving the DLQ messages back to their - // original Q to test that they are continually placed back in the DLQ. - for (int i = 0; i < 2; ++i) { - - assertTrue("DLQ was not filled as expected", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return dlqView.getQueueSize() == messageCount; - } - })); - - connection.stop(); - - assertEquals("DLQ should be full now.", messageCount, dlqView.getQueueSize()); - - String moveTo; - if (topic) { - moveTo = "topic://" + ((Topic) getDestination()).getTopicName(); - } - else { - moveTo = "queue://" + ((Queue) getDestination()).getQueueName(); - } - - LOG.debug("Moving " + messageCount + " messages from ActiveMQ.DLQ to " + moveTo); - dlqView.moveMatchingMessagesTo("", moveTo); - - assertTrue("DLQ was not emptied as expected", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return dlqView.getQueueSize() == 0; - } - })); - - connection.start(); - } - } - - protected void makeConsumer() throws JMSException { - Destination destination = getDestination(); - LOG.info("Consuming from: " + destination); - if (durableSubscriber) { - consumer = session.createDurableSubscriber((Topic) destination, destination.toString()); - } - else { - consumer = session.createConsumer(destination); - } - } - - protected void makeDlqConsumer() throws JMSException { - dlqDestination = createDlqDestination(); - - LOG.info("Consuming from dead letter on: " + dlqDestination); - dlqConsumer = dlqSession.createConsumer(dlqDestination); - } - - @Override - protected void setUp() throws Exception { - broker = createBroker(); - broker.start(); - broker.waitUntilStarted(); - - connection = createConnection(); - connection.setClientID(createClientId()); - - session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - connection.start(); - - dlqSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - @Override - protected void tearDown() throws Exception { - dlqConsumer.close(); - dlqSession.close(); - session.close(); - - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - } - - @Override - protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { - ActiveMQConnectionFactory answer = super.createConnectionFactory(); - RedeliveryPolicy policy = new RedeliveryPolicy(); - policy.setMaximumRedeliveries(3); - policy.setBackOffMultiplier((short) 1); - policy.setRedeliveryDelay(0); - policy.setInitialRedeliveryDelay(0); - policy.setUseExponentialBackOff(false); - answer.setRedeliveryPolicy(policy); - return answer; - } - - protected void sendMessages() throws JMSException { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer = session.createProducer(getDestination()); - producer.setDeliveryMode(deliveryMode); - - LOG.info("Sending " + messageCount + " messages to: " + getDestination()); - for (int i = 0; i < messageCount; i++) { - Message message = createMessage(session, i); - producer.send(message); - } - } - - protected TextMessage createMessage(Session session, int i) throws JMSException { - return session.createTextMessage(getMessageText(i)); - } - - protected String getMessageText(int i) { - return "message: " + i; - } - - protected Destination createDlqDestination() { - return new ActiveMQQueue("ActiveMQ.DLQ"); - } - - private QueueViewMBean getProxyToDLQ() throws MalformedObjectNameException, JMSException { - ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost," + "destinationType=Queue,destinationName=ActiveMQ.DLQ"); - QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); - return proxy; - } - - protected Destination getDestination() { - if (destination == null) { - destination = createDestination(); - } - return destination; - } - - protected String createClientId() { - return toString(); - } - - class RollbackMessageListener implements MessageListener { - - final int maxRollbacks; - final int deliveryCount; - final AtomicInteger rollbacks = new AtomicInteger(); - - RollbackMessageListener(int c, int delvery) { - maxRollbacks = c; - deliveryCount = delvery; - } - - @Override - public void onMessage(Message message) { - try { - int expectedMessageId = rollbacks.get() / deliveryCount; - LOG.info("expecting messageId: " + expectedMessageId); - rollbacks.incrementAndGet(); - session.rollback(); - } - catch (Throwable e) { - LOG.error("unexpected exception:" + e, e); - // propagating assertError to execution task will cause a hang - // at shutdown - if (e instanceof Error) { - error[0] = (Error) e; - } - else { - fail("unexpected exception: " + e); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java deleted file mode 100644 index 8fd2765..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java +++ /dev/null @@ -1,203 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.net.URI; -import java.util.Random; -import java.util.concurrent.CountDownLatch; - -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQMessageConsumer; -import org.apache.activemq.ActiveMQSession; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ3436Test { - - protected static final Logger LOG = LoggerFactory.getLogger(AMQ3436Test.class); - - private BrokerService broker; - private PersistenceAdapter adapter; - private boolean useCache = true; - private boolean prioritizeMessages = true; - - protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception { - KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); - adapter.setConcurrentStoreAndDispatchQueues(false); - adapter.setConcurrentStoreAndDispatchTopics(false); - adapter.deleteAllMessages(); - return adapter; - } - - @Before - public void setUp() throws Exception { - broker = new BrokerService(); - broker.setBrokerName("priorityTest"); - broker.setAdvisorySupport(false); - broker.setUseJmx(false); - adapter = createPersistenceAdapter(true); - broker.setPersistenceAdapter(adapter); - PolicyEntry policy = new PolicyEntry(); - policy.setPrioritizedMessages(prioritizeMessages); - policy.setUseCache(useCache); - policy.setProducerFlowControl(false); - PolicyMap policyMap = new PolicyMap(); - policyMap.put(new ActiveMQQueue("TEST"), policy); - - // do not process expired for one test - PolicyEntry ignoreExpired = new PolicyEntry(); - SharedDeadLetterStrategy ignoreExpiredStrategy = new SharedDeadLetterStrategy(); - ignoreExpiredStrategy.setProcessExpired(false); - ignoreExpired.setDeadLetterStrategy(ignoreExpiredStrategy); - - broker.setDestinationPolicy(policyMap); - broker.start(); - broker.waitUntilStarted(); - } - - protected void tearDown() throws Exception { - broker.stop(); - broker.waitUntilStopped(); - } - - @Test - public void testPriorityWhenConsumerCreatedBeforeProduction() throws Exception { - - int messageCount = 200; - URI failoverUri = new URI("vm://priorityTest?jms.prefetchPolicy.all=1"); - - ActiveMQQueue dest = new ActiveMQQueue("TEST?consumer.dispatchAsync=false"); - - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(failoverUri); - cf.setDispatchAsync(false); - - // Create producer - ActiveMQConnection producerConnection = (ActiveMQConnection) cf.createConnection(); - producerConnection.setMessagePrioritySupported(true); - producerConnection.start(); - final Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED); - MessageProducer producer = producerSession.createProducer(dest); - - ActiveMQMessageConsumer consumer; - - // Create consumer on separate connection - ActiveMQConnection consumerConnection = (ActiveMQConnection) cf.createConnection(); - consumerConnection.setMessagePrioritySupported(true); - consumerConnection.start(); - final ActiveMQSession consumerSession = (ActiveMQSession) consumerConnection.createSession(true, Session.SESSION_TRANSACTED); - consumer = (ActiveMQMessageConsumer) consumerSession.createConsumer(dest); - - // Produce X number of messages with a session commit after each message - Random random = new Random(); - for (int i = 0; i < messageCount; ++i) { - - Message message = producerSession.createTextMessage("Test message #" + i); - producer.send(message, DeliveryMode.PERSISTENT, random.nextInt(10), 45 * 1000); - producerSession.commit(); - } - producer.close(); - - // *************************************************** - // If we create the consumer here instead of above, the - // the messages will be consumed in priority order - // *************************************************** - //consumer = (ActiveMQMessageConsumer) consumerSession.createConsumer(dest); - - // Consume all of the messages we produce using a listener. - // Don't exit until we get all the messages. - final CountDownLatch latch = new CountDownLatch(messageCount); - final StringBuffer failureMessage = new StringBuffer(); - consumer.setMessageListener(new MessageListener() { - int lowestPrioritySeen = 10; - - boolean firstMessage = true; - - @Override - public void onMessage(Message msg) { - try { - - int currentPriority = msg.getJMSPriority(); - LOG.debug(currentPriority + "<=" + lowestPrioritySeen); - - // Ignore the first message priority since it is prefetched - // and is out of order by design - if (firstMessage == true) { - firstMessage = false; - LOG.debug("Ignoring first message since it was prefetched"); - - } - else { - - // Verify that we never see a priority higher than the - // lowest - // priority seen - if (lowestPrioritySeen > currentPriority) { - lowestPrioritySeen = currentPriority; - } - if (lowestPrioritySeen < currentPriority) { - failureMessage.append("Incorrect priority seen (Lowest Priority = " + lowestPrioritySeen + " Current Priority = " + currentPriority + ")" + System.getProperty("line.separator")); - } - } - - } - catch (JMSException e) { - e.printStackTrace(); - } - finally { - latch.countDown(); - LOG.debug("Messages remaining = " + latch.getCount()); - } - } - }); - - latch.await(); - consumer.close(); - - // Cleanup producer resources - producerSession.close(); - producerConnection.stop(); - producerConnection.close(); - - // Cleanup consumer resources - consumerSession.close(); - consumerConnection.stop(); - consumerConnection.close(); - - // Report the failure if found - if (failureMessage.length() > 0) { - Assert.fail(failureMessage.toString()); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3445Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3445Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3445Test.java deleted file mode 100644 index d36faf9..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3445Test.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.QueueViewMBean; -import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ3445Test { - - private ConnectionFactory connectionFactory; - private BrokerService broker; - private String connectionUri; - - private final String queueName = "Consumer.MyApp.VirtualTopic.FOO"; - private final String topicName = "VirtualTopic.FOO"; - - @Before - public void startBroker() throws Exception { - createBroker(true); - } - - private void createBroker(boolean deleteMessages) throws Exception { - broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(deleteMessages); - broker.setPersistenceAdapter(new JDBCPersistenceAdapter()); - broker.setAdvisorySupport(false); - broker.addConnector("tcp://0.0.0.0:0"); - broker.start(); - broker.waitUntilStarted(); - connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - connectionFactory = new ActiveMQConnectionFactory(connectionUri); - } - - private void restartBroker() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - - createBroker(false); - } - - @After - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - } - - @Test - public void testJDBCRetiansDestinationAfterRestart() throws Exception { - - broker.getAdminView().addQueue(queueName); - broker.getAdminView().addTopic(topicName); - - assertTrue(findDestination(queueName, false)); - assertTrue(findDestination(topicName, true)); - - QueueViewMBean queue = getProxyToQueueViewMBean(); - assertEquals(0, queue.getQueueSize()); - - restartBroker(); - - assertTrue(findDestination(queueName, false)); - queue = getProxyToQueueViewMBean(); - assertEquals(0, queue.getQueueSize()); - - sendMessage(); - restartBroker(); - assertTrue(findDestination(queueName, false)); - - queue = getProxyToQueueViewMBean(); - assertEquals(1, queue.getQueueSize()); - sendMessage(); - assertEquals(2, queue.getQueueSize()); - - restartBroker(); - assertTrue(findDestination(queueName, false)); - queue = getProxyToQueueViewMBean(); - assertEquals(2, queue.getQueueSize()); - } - - private void sendMessage() throws Exception { - Connection connection = connectionFactory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(session.createTopic(topicName)); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - producer.send(session.createTextMessage("Testing")); - producer.close(); - connection.close(); - } - - private QueueViewMBean getProxyToQueueViewMBean() throws Exception { - ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + ":destinationType=Queue,destinationName=" + queueName + ",type=Broker,brokerName=localhost"); - QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); - return proxy; - } - - private boolean findDestination(String name, boolean topic) throws Exception { - - ObjectName[] destinations; - - if (topic) { - destinations = broker.getAdminView().getTopics(); - } - else { - destinations = broker.getAdminView().getQueues(); - } - - for (ObjectName destination : destinations) { - if (destination.toString().contains(name)) { - return true; - } - } - - return false; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3454Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3454Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3454Test.java deleted file mode 100644 index 96f0c2c..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3454Test.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ3454Test extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ3454Test.class); - private static final int MESSAGES_COUNT = 10000; - - public void testSendWithLotsOfDestinations() throws Exception { - final BrokerService broker = new BrokerService(); - broker.setPersistent(false); - broker.setUseJmx(false); - broker.setDeleteAllMessagesOnStartup(true); - - broker.addConnector("tcp://localhost:0"); - - // populate a bunch of destinations, validate the impact on a call to send - ActiveMQDestination[] destinations = new ActiveMQDestination[MESSAGES_COUNT]; - for (int idx = 0; idx < MESSAGES_COUNT; ++idx) { - destinations[idx] = new ActiveMQQueue(getDestinationName() + "-" + idx); - } - broker.setDestinations(destinations); - broker.start(); - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); - final Connection connection = factory.createConnection(); - connection.start(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(new ActiveMQQueue(getDestinationName())); - - long start = System.currentTimeMillis(); - for (int idx = 0; idx < MESSAGES_COUNT; ++idx) { - Message message = session.createTextMessage("" + idx); - producer.send(message); - } - LOG.info("Duration: " + (System.currentTimeMillis() - start) + " millis"); - producer.close(); - session.close(); - - } - - protected String getDestinationName() { - return getClass().getName() + "." + getName(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java deleted file mode 100644 index 5e6b2ff..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java +++ /dev/null @@ -1,198 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.*; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.XAConnection; -import javax.jms.XAConnectionFactory; -import javax.jms.XASession; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQMessageProducer; -import org.apache.activemq.ActiveMQSession; -import org.apache.activemq.ActiveMQXAConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTextMessage; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ3465Test { - - private final String xaDestinationName = "DestinationXA"; - private final String destinationName = "Destination"; - private BrokerService broker; - private String connectionUri; - private long txGenerator = System.currentTimeMillis(); - - private XAConnectionFactory xaConnectionFactory; - private ConnectionFactory connectionFactory; - - @Before - public void startBroker() throws Exception { - broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(true); - broker.setPersistent(false); - broker.setUseJmx(false); - broker.addConnector("tcp://0.0.0.0:0"); - broker.start(); - broker.waitUntilStarted(); - - connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - - connectionFactory = new ActiveMQConnectionFactory(connectionUri); - xaConnectionFactory = new ActiveMQXAConnectionFactory(connectionUri); - } - - @After - public void stopBroker() throws Exception { - broker.stop(); - broker.waitUntilStopped(); - } - - @Test - public void testMixedXAandNonXAorTXSessions() throws Exception { - - XAConnection xaConnection = xaConnectionFactory.createXAConnection(); - xaConnection.start(); - XASession session = xaConnection.createXASession(); - XAResource resource = session.getXAResource(); - Destination dest = new ActiveMQQueue(xaDestinationName); - - // publish a message - Xid tid = createXid(); - resource.start(tid, XAResource.TMNOFLAGS); - MessageProducer producer = session.createProducer(dest); - ActiveMQTextMessage message = new ActiveMQTextMessage(); - message.setText("Some Text"); - producer.send(message); - resource.end(tid, XAResource.TMSUCCESS); - resource.commit(tid, true); - session.close(); - - session = xaConnection.createXASession(); - MessageConsumer consumer = session.createConsumer(dest); - tid = createXid(); - resource = session.getXAResource(); - resource.start(tid, XAResource.TMNOFLAGS); - TextMessage receivedMessage = (TextMessage) consumer.receive(1000); - assertNotNull(receivedMessage); - assertEquals("Some Text", receivedMessage.getText()); - resource.end(tid, XAResource.TMSUCCESS); - - // Test that a normal session doesn't operate on XASession state. - Connection connection2 = connectionFactory.createConnection(); - connection2.start(); - ActiveMQSession session2 = (ActiveMQSession) connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); - - if (session2.isTransacted()) { - session2.rollback(); - } - - session2.close(); - - resource.commit(tid, true); - } - - @Test - public void testMixedXAandNonXALocalTXSessions() throws Exception { - - XAConnection xaConnection = xaConnectionFactory.createXAConnection(); - xaConnection.start(); - XASession session = xaConnection.createXASession(); - XAResource resource = session.getXAResource(); - Destination dest = new ActiveMQQueue(xaDestinationName); - - // publish a message - Xid tid = createXid(); - resource.start(tid, XAResource.TMNOFLAGS); - MessageProducer producer = session.createProducer(dest); - ActiveMQTextMessage message = new ActiveMQTextMessage(); - message.setText("Some Text"); - producer.send(message); - resource.end(tid, XAResource.TMSUCCESS); - resource.commit(tid, true); - session.close(); - - session = xaConnection.createXASession(); - MessageConsumer consumer = session.createConsumer(dest); - tid = createXid(); - resource = session.getXAResource(); - resource.start(tid, XAResource.TMNOFLAGS); - TextMessage receivedMessage = (TextMessage) consumer.receive(1000); - assertNotNull(receivedMessage); - assertEquals("Some Text", receivedMessage.getText()); - resource.end(tid, XAResource.TMSUCCESS); - - // Test that a normal session doesn't operate on XASession state. - Connection connection2 = connectionFactory.createConnection(); - connection2.start(); - ActiveMQSession session2 = (ActiveMQSession) connection2.createSession(true, Session.AUTO_ACKNOWLEDGE); - Destination destination = new ActiveMQQueue(destinationName); - ActiveMQMessageProducer producer2 = (ActiveMQMessageProducer) session2.createProducer(destination); - producer2.send(session2.createTextMessage("Local-TX")); - - if (session2.isTransacted()) { - session2.rollback(); - } - - session2.close(); - - resource.commit(tid, true); - } - - public Xid createXid() throws IOException { - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream os = new DataOutputStream(baos); - os.writeLong(++txGenerator); - os.close(); - final byte[] bs = baos.toByteArray(); - - return new Xid() { - @Override - public int getFormatId() { - return 86; - } - - @Override - public byte[] getGlobalTransactionId() { - return bs; - } - - @Override - public byte[] getBranchQualifier() { - return bs; - } - }; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java deleted file mode 100644 index 3d9d2d4..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.activemq.bugs; - -import java.util.Properties; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ3529Test { - - private static Logger LOG = LoggerFactory.getLogger(AMQ3529Test.class); - - private ConnectionFactory connectionFactory; - private Connection connection; - private Session session; - private BrokerService broker; - private String connectionUri; - private MessageConsumer consumer; - private Context ctx = null; - - @Before - public void startBroker() throws Exception { - broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(true); - broker.setPersistent(false); - broker.setUseJmx(false); - broker.addConnector("tcp://0.0.0.0:0"); - broker.start(); - broker.waitUntilStarted(); - - connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - - connectionFactory = new ActiveMQConnectionFactory(connectionUri); - } - - @After - public void stopBroker() throws Exception { - broker.stop(); - broker.waitUntilStopped(); - } - - @Test(timeout = 60000) - public void testInterruptionAffects() throws Exception { - ThreadGroup tg = new ThreadGroup("tg"); - - assertEquals(0, tg.activeCount()); - - Thread client = new Thread(tg, "client") { - - @Override - public void run() { - try { - connection = connectionFactory.createConnection(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - assertNotNull(session); - - Properties props = new Properties(); - props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); - props.setProperty(Context.PROVIDER_URL, "tcp://0.0.0.0:0"); - ctx = null; - try { - ctx = new InitialContext(props); - } - catch (NoClassDefFoundError e) { - throw new NamingException(e.toString()); - } - catch (Exception e) { - throw new NamingException(e.toString()); - } - Destination destination = (Destination) ctx.lookup("dynamicTopics/example.C"); - consumer = session.createConsumer(destination); - consumer.receive(10000); - } - catch (Exception e) { - // Expect an exception here from the interrupt. - } - finally { - // next line is the nature of the test, if I remove this - // line, everything works OK - try { - consumer.close(); - } - catch (JMSException e) { - fail("Consumer Close failed with" + e.getMessage()); - } - try { - session.close(); - } - catch (JMSException e) { - fail("Session Close failed with" + e.getMessage()); - } - try { - connection.close(); - } - catch (JMSException e) { - fail("Connection Close failed with" + e.getMessage()); - } - try { - ctx.close(); - } - catch (Exception e) { - fail("Connection Close failed with" + e.getMessage()); - } - } - } - }; - client.start(); - Thread.sleep(5000); - client.interrupt(); - client.join(); - Thread.sleep(2000); - Thread[] remainThreads = new Thread[tg.activeCount()]; - tg.enumerate(remainThreads); - for (Thread t : remainThreads) { - if (t.isAlive() && !t.isDaemon()) - fail("Remaining thread: " + t.toString()); - } - - ThreadGroup root = Thread.currentThread().getThreadGroup().getParent(); - while (root.getParent() != null) { - root = root.getParent(); - } - visit(root, 0); - } - - // This method recursively visits all thread groups under `group'. - public static void visit(ThreadGroup group, int level) { - // Get threads in `group' - int numThreads = group.activeCount(); - Thread[] threads = new Thread[numThreads * 2]; - numThreads = group.enumerate(threads, false); - - // Enumerate each thread in `group' - for (int i = 0; i < numThreads; i++) { - // Get thread - Thread thread = threads[i]; - LOG.debug("Thread:" + thread.getName() + " is still running"); - } - - // Get thread subgroups of `group' - int numGroups = group.activeGroupCount(); - ThreadGroup[] groups = new ThreadGroup[numGroups * 2]; - numGroups = group.enumerate(groups, false); - - // Recursively visit each subgroup - for (int i = 0; i < numGroups; i++) { - visit(groups[i], level + 1); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3537Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3537Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3537Test.java deleted file mode 100644 index 614631f..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3537Test.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.List; - -import org.apache.activemq.util.ClassLoadingAwareObjectInputStream; -import org.junit.Before; -import org.junit.Test; - -/** - * Quick port to java to support AMQ build. - * - * This test demonstrates the classloader problem in the - * ClassLoadingAwareObjectInputStream impl. If the first interface in the proxy - * interfaces list is JDK and there are any subsequent interfaces that are NOT - * JDK interfaces the ClassLoadingAwareObjectInputStream will ignore their - * respective classloaders and cause the Proxy to throw an - * IllegalArgumentException because the core JDK classloader can't load the - * interfaces that are not JDK interfaces. - * - * See AMQ-3537 - * - * @author jason.yankus - */ -@SuppressWarnings({"rawtypes", "unchecked"}) -public class AMQ3537Test implements InvocationHandler, Serializable { - - private static final long serialVersionUID = 1L; - - /** - * If the first and second element in this array are swapped, the test will - * fail. - */ - public static final Class[] TEST_CLASSES = new Class[]{List.class, NonJDKList.class, Serializable.class}; - - /** - * Underlying list - */ - private final List l = new ArrayList(); - - @Before - public void setUp() throws Exception { - l.add("foo"); - } - - @Test - public void testDeserializeProxy() throws Exception { - // create the proxy - List proxy = (List) java.lang.reflect.Proxy.newProxyInstance(this.getClass().getClassLoader(), TEST_CLASSES, this); - - // serialize it - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(proxy); - byte[] serializedProxy = baos.toByteArray(); - oos.close(); - baos.close(); - - // deserialize the proxy - ClassLoadingAwareObjectInputStream claois = new ClassLoadingAwareObjectInputStream(new ByteArrayInputStream(serializedProxy)); - - // this is where it fails due to the rudimentary classloader selection - // in ClassLoadingAwareObjectInputStream - List deserializedProxy = (List) claois.readObject(); - - claois.close(); - - // assert the invocation worked - assertEquals("foo", deserializedProxy.get(0)); - } - - @Override - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - return method.invoke(l, args); - } - - public interface NonJDKList { - - int size(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3567Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3567Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3567Test.java deleted file mode 100644 index c567eb3..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3567Test.java +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.fail; - -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.util.DefaultTestAppender; -import org.apache.log4j.Appender; -import org.apache.log4j.Level; -import org.apache.log4j.spi.LoggingEvent; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author Claudio Corsi - */ -public class AMQ3567Test { - - private static Logger logger = LoggerFactory.getLogger(AMQ3567Test.class); - - private ActiveMQConnectionFactory factory; - private Connection connection; - private Session sessionWithListener, session; - private Queue destination; - private MessageConsumer consumer; - private Thread thread; - private BrokerService broker; - private String connectionUri; - - /** - * @throws java.lang.Exception - */ - @Before - public void setUp() throws Exception { - startBroker(); - initializeConsumer(); - startConsumer(); - } - - @Test - public void runTest() throws Exception { - produceSingleMessage(); - org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger("org.apache.activemq.util.ServiceSupport"); - final AtomicBoolean failed = new AtomicBoolean(false); - - Appender appender = new DefaultTestAppender() { - @Override - public void doAppend(LoggingEvent event) { - if (event.getThrowableInformation() != null) { - if (event.getThrowableInformation().getThrowable() instanceof InterruptedException) { - InterruptedException ie = (InterruptedException) event.getThrowableInformation().getThrowable(); - if (ie.getMessage().startsWith("Could not stop service:")) { - logger.info("Received an interrupted exception : ", ie); - failed.set(true); - } - } - } - } - }; - log4jLogger.addAppender(appender); - - Level level = log4jLogger.getLevel(); - log4jLogger.setLevel(Level.DEBUG); - - try { - stopConsumer(); - stopBroker(); - if (failed.get()) { - fail("An Interrupt exception was generated"); - } - - } - finally { - log4jLogger.setLevel(level); - log4jLogger.removeAppender(appender); - } - } - - private void startBroker() throws Exception { - broker = new BrokerService(); - broker.setDataDirectory("target/data"); - connectionUri = broker.addConnector("tcp://localhost:0?wireFormat.maxInactivityDuration=30000&transport.closeAsync=false&transport.threadName&soTimeout=60000&transport.keepAlive=false&transport.useInactivityMonitor=false").getPublishableConnectString(); - broker.start(true); - broker.waitUntilStarted(); - } - - private void stopBroker() throws Exception { - broker.stop(); - broker.waitUntilStopped(); - } - - private void initializeConsumer() throws JMSException { - logger.info("Initializing the consumer messagor that will just not do anything...."); - factory = new ActiveMQConnectionFactory(); - factory.setBrokerURL("failover:(" + connectionUri + "?wireFormat.maxInactivityDuration=30000&keepAlive=true&soTimeout=60000)?jms.watchTopicAdvisories=false&jms.useAsyncSend=false&jms.dispatchAsync=true&jms.producerWindowSize=10485760&jms.copyMessageOnSend=false&jms.disableTimeStampsByDefault=true&InitialReconnectDelay=1000&maxReconnectDelay=10000&maxReconnectAttempts=400&useExponentialBackOff=true"); - connection = factory.createConnection(); - connection.start(); - sessionWithListener = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = sessionWithListener.createQueue("EMPTY.QUEUE"); - } - - private void startConsumer() throws Exception { - logger.info("Starting the consumer"); - consumer = sessionWithListener.createConsumer(destination); - consumer.setMessageListener(new MessageListener() { - - @Override - public void onMessage(Message message) { - logger.info("Received a message: " + message); - } - - }); - - thread = new Thread(new Runnable() { - - private Session session; - - @Override - public void run() { - try { - destination = session.createQueue("EMPTY.QUEUE"); - MessageConsumer consumer = session.createConsumer(destination); - for (int cnt = 0; cnt < 2; cnt++) { - Message message = consumer.receive(50000); - logger.info("Received message: " + message); - } - } - catch (JMSException e) { - logger.debug("Received an exception while processing messages", e); - } - finally { - try { - session.close(); - } - catch (JMSException e) { - logger.debug("Received an exception while closing session", e); - } - } - } - - public Runnable setSession(Session session) { - this.session = session; - return this; - } - - }.setSession(session)) { - { - start(); - } - }; - } - - private void stopConsumer() throws JMSException { - logger.info("Stopping the consumer"); - try { - thread.join(); - } - catch (InterruptedException e) { - logger.debug("Received an exception while waiting for thread to complete", e); - } - if (sessionWithListener != null) { - sessionWithListener.close(); - } - if (connection != null) { - connection.stop(); - } - } - - private void produceSingleMessage() throws JMSException { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); - factory.setBrokerURL(connectionUri); - Connection connection = factory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue("EMPTY.QUEUE"); - MessageProducer producer = session.createProducer(destination); - producer.send(session.createTextMessage("Single Message")); - producer.close(); - session.close(); - connection.close(); - } -}