activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject activemq-artemis git commit: NO-JIRA: Removing Flow Control tests
Date Sat, 01 Apr 2017 15:18:12 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 5c6c42b94 -> 9c472013e


NO-JIRA: Removing Flow Control tests

These tests are duplicated on the regular testsuite and they were fixed there


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9c472013
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9c472013
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9c472013

Branch: refs/heads/master
Commit: 9c472013e273fc92f9a1f6b06192897af2a4ab91
Parents: 5c6c42b
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Sat Apr 1 11:17:06 2017 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Sat Apr 1 11:17:08 2017 -0400

----------------------------------------------------------------------
 .../ProducerFlowControlSendFailTest.java        | 177 ----------
 .../activemq/ProducerFlowControlTest.java       | 353 -------------------
 2 files changed, 530 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c472013/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
deleted file mode 100644
index 06aeeee..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.ResourceAllocationException;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
-import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
-
-public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest {
-
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      BrokerService service = new BrokerService();
-      service.setPersistent(false);
-      service.setUseJmx(false);
-
-      // Setup a destination policy where it takes only 1 message at a time.
-      PolicyMap policyMap = new PolicyMap();
-      PolicyEntry policy = new PolicyEntry();
-      policy.setMemoryLimit(1);
-      policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
-      policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
-      policy.setProducerFlowControl(true);
-      policyMap.setDefaultEntry(policy);
-      service.setDestinationPolicy(policyMap);
-
-      service.getSystemUsage().setSendFailIfNoSpace(true);
-
-      connector = service.addConnector("tcp://localhost:0");
-      return service;
-   }
-
-   @Override
-   public void test2ndPublisherWithStandardConnectionThatIsBlocked() throws Exception {
-      // with sendFailIfNoSpace set, there is no blocking of the connection
-   }
-
-   @Override
-   public void testAsyncPublisherRecoverAfterBlock() throws Exception {
-      // sendFail means no flowControllwindow as there is no producer ack, just an exception
-   }
-
-   @Override
-   public void testPublisherRecoverAfterBlock() throws Exception {
-      ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
-      // with sendFail, there must be no flowControllwindow
-      // sendFail is an alternative flow control mechanism that does not block
-      factory.setUseAsyncSend(true);
-      connection = (ActiveMQConnection) factory.createConnection();
-      connections.add(connection);
-      connection.start();
-
-      final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-      final MessageProducer producer = session.createProducer(queueA);
-
-      final AtomicBoolean keepGoing = new AtomicBoolean(true);
-
-      Thread thread = new Thread("Filler") {
-         @Override
-         public void run() {
-            while (keepGoing.get()) {
-               try {
-                  producer.send(session.createTextMessage("Test message"));
-                  if (gotResourceException.get()) {
-                     // do not flood the broker with requests when full as we are sending
async and they
-                     // will be limited by the network buffers
-                     Thread.sleep(200);
-                  }
-               } catch (Exception e) {
-                  // with async send, there will be no exceptions
-                  e.printStackTrace();
-               }
-            }
-         }
-      };
-      thread.start();
-      waitForBlockedOrResourceLimit(new AtomicBoolean(false));
-
-      // resourceException on second message, resumption if we
-      // can receive 10
-      MessageConsumer consumer = session.createConsumer(queueA);
-      TextMessage msg;
-      for (int idx = 0; idx < 10; ++idx) {
-         msg = (TextMessage) consumer.receive(1000);
-         if (msg != null) {
-            msg.acknowledge();
-         }
-      }
-      keepGoing.set(false);
-   }
-
-   public void testPublisherRecoverAfterBlockWithSyncSend() throws Exception {
-      ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
-      factory.setExceptionListener(null);
-      factory.setUseAsyncSend(false);
-      connection = (ActiveMQConnection) factory.createConnection();
-      connections.add(connection);
-      connection.start();
-
-      final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-      final MessageProducer producer = session.createProducer(queueA);
-
-      final AtomicBoolean keepGoing = new AtomicBoolean(true);
-      final AtomicInteger exceptionCount = new AtomicInteger(0);
-      Thread thread = new Thread("Filler") {
-         @Override
-         public void run() {
-            while (keepGoing.get()) {
-               try {
-                  producer.send(session.createTextMessage("Test message"));
-               } catch (JMSException arg0) {
-                  if (arg0 instanceof ResourceAllocationException) {
-                     gotResourceException.set(true);
-                     exceptionCount.incrementAndGet();
-                  }
-               }
-            }
-         }
-      };
-      thread.start();
-      waitForBlockedOrResourceLimit(new AtomicBoolean(false));
-
-      // resourceException on second message, resumption if we
-      // can receive 10
-      MessageConsumer consumer = session.createConsumer(queueA);
-      TextMessage msg;
-      for (int idx = 0; idx < 10; ++idx) {
-         msg = (TextMessage) consumer.receive(1000);
-         if (msg != null) {
-            msg.acknowledge();
-         }
-      }
-      assertTrue("we were blocked at least 5 times", 5 < exceptionCount.get());
-      keepGoing.set(false);
-   }
-
-   @Override
-   protected ConnectionFactory createConnectionFactory() throws Exception {
-      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connector.getConnectUri());
-      connectionFactory.setExceptionListener(new ExceptionListener() {
-         @Override
-         public void onException(JMSException arg0) {
-            if (arg0 instanceof ResourceAllocationException) {
-               gotResourceException.set(true);
-            }
-         }
-      });
-      return connectionFactory;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c472013/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
deleted file mode 100644
index 721da2c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
-import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.transport.tcp.TcpTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ProducerFlowControlTest extends JmsTestSupport {
-
-   static final Logger LOG = LoggerFactory.getLogger(ProducerFlowControlTest.class);
-   ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
-   ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
-   protected TransportConnector connector;
-   protected ActiveMQConnection connection;
-   // used to test sendFailIfNoSpace on SystemUsage
-   protected final AtomicBoolean gotResourceException = new AtomicBoolean(false);
-
-   public void test2ndPublisherWithProducerWindowSendConnectionThatIsBlocked() throws Exception
{
-      ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
-      factory.setProducerWindowSize(1024 * 64);
-      connection = (ActiveMQConnection) factory.createConnection();
-      connections.add(connection);
-      connection.start();
-
-      Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-      MessageConsumer consumer = session.createConsumer(queueB);
-
-      // Test sending to Queue A
-      // 1 few sends should not block until the producer window is used up.
-      fillQueue(queueA);
-
-      // Test sending to Queue B it should not block since the connection
-      // should not be blocked.
-      CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
-      assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
-
-      TextMessage msg = (TextMessage) consumer.receive();
-      assertEquals("Message 1", msg.getText());
-      msg.acknowledge();
-
-      pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
-      assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
-
-      msg = (TextMessage) consumer.receive();
-      assertEquals("Message 2", msg.getText());
-      msg.acknowledge();
-   }
-
-   public void testPublisherRecoverAfterBlock() throws Exception {
-      ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
-      connection = (ActiveMQConnection) factory.createConnection();
-      connections.add(connection);
-      connection.start();
-
-      final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-      final MessageProducer producer = session.createProducer(queueA);
-
-      final AtomicBoolean done = new AtomicBoolean(true);
-      final AtomicBoolean keepGoing = new AtomicBoolean(true);
-
-      Thread thread = new Thread("Filler") {
-         int i;
-
-         @Override
-         public void run() {
-            while (keepGoing.get()) {
-               done.set(false);
-               try {
-                  producer.send(session.createTextMessage("Test message " + ++i));
-                  LOG.info("sent: " + i);
-               } catch (JMSException e) {
-               }
-            }
-         }
-      };
-      thread.start();
-      waitForBlockedOrResourceLimit(done);
-
-      // after receiveing messges, producer should continue sending messages
-      // (done == false)
-      MessageConsumer consumer = session.createConsumer(queueA);
-      TextMessage msg;
-      for (int idx = 0; idx < 5; ++idx) {
-         msg = (TextMessage) consumer.receive(1000);
-         LOG.info("received: " + idx + ", msg: " + msg.getJMSMessageID());
-         msg.acknowledge();
-      }
-      Thread.sleep(1000);
-      keepGoing.set(false);
-
-      assertFalse("producer has resumed", done.get());
-   }
-
-   public void testAsyncPublisherRecoverAfterBlock() throws Exception {
-      ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
-      factory.setProducerWindowSize(1024 * 5);
-      factory.setUseAsyncSend(true);
-      connection = (ActiveMQConnection) factory.createConnection();
-      connections.add(connection);
-      connection.start();
-
-      final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-      final MessageProducer producer = session.createProducer(queueA);
-
-      final AtomicBoolean done = new AtomicBoolean(true);
-      final AtomicBoolean keepGoing = new AtomicBoolean(true);
-
-      Thread thread = new Thread("Filler") {
-         int i;
-
-         @Override
-         public void run() {
-            while (keepGoing.get()) {
-               done.set(false);
-               try {
-                  producer.send(session.createTextMessage("Test message " + ++i));
-                  LOG.info("sent: " + i);
-               } catch (JMSException e) {
-               }
-            }
-         }
-      };
-      thread.start();
-      waitForBlockedOrResourceLimit(done);
-
-      // after receiveing messges, producer should continue sending messages
-      // (done == false)
-      MessageConsumer consumer = session.createConsumer(queueA);
-      TextMessage msg;
-      for (int idx = 0; idx < 5; ++idx) {
-         msg = (TextMessage) consumer.receive(1000);
-         assertNotNull("Got a message", msg);
-         LOG.info("received: " + idx + ", msg: " + msg.getJMSMessageID());
-         msg.acknowledge();
-      }
-      Thread.sleep(1000);
-      keepGoing.set(false);
-
-      assertFalse("producer has resumed", done.get());
-   }
-
-   public void test2ndPublisherWithSyncSendConnectionThatIsBlocked() throws Exception {
-      ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
-      factory.setAlwaysSyncSend(true);
-      connection = (ActiveMQConnection) factory.createConnection();
-      connections.add(connection);
-      connection.start();
-
-      Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-      MessageConsumer consumer = session.createConsumer(queueB);
-
-      // Test sending to Queue A
-      // 1st send should not block. But the rest will.
-      fillQueue(queueA);
-
-      // Test sending to Queue B it should not block.
-      CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
-      assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
-
-      TextMessage msg = (TextMessage) consumer.receive();
-      assertEquals("Message 1", msg.getText());
-      msg.acknowledge();
-
-      pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
-      assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
-
-      msg = (TextMessage) consumer.receive();
-      assertEquals("Message 2", msg.getText());
-      msg.acknowledge();
-   }
-
-   public void testSimpleSendReceive() throws Exception {
-      ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
-      factory.setAlwaysSyncSend(true);
-      connection = (ActiveMQConnection) factory.createConnection();
-      connections.add(connection);
-      connection.start();
-
-      Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-      MessageConsumer consumer = session.createConsumer(queueA);
-
-      // Test sending to Queue B it should not block.
-      CountDownLatch pubishDoneToQeueuA = asyncSendTo(queueA, "Message 1");
-      assertTrue(pubishDoneToQeueuA.await(2, TimeUnit.SECONDS));
-
-      TextMessage msg = (TextMessage) consumer.receive();
-      assertEquals("Message 1", msg.getText());
-      msg.acknowledge();
-
-      pubishDoneToQeueuA = asyncSendTo(queueA, "Message 2");
-      assertTrue(pubishDoneToQeueuA.await(2, TimeUnit.SECONDS));
-
-      msg = (TextMessage) consumer.receive();
-      assertEquals("Message 2", msg.getText());
-      msg.acknowledge();
-   }
-
-   public void test2ndPublisherWithStandardConnectionThatIsBlocked() throws Exception {
-      ConnectionFactory factory = createConnectionFactory();
-      connection = (ActiveMQConnection) factory.createConnection();
-      connections.add(connection);
-      connection.start();
-
-      // Test sending to Queue A
-      // 1st send should not block.
-      fillQueue(queueA);
-
-      // Test sending to Queue B it should block.
-      // Since even though the it's queue limits have not been reached, the
-      // connection
-      // is blocked.
-      CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
-      assertFalse(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
-   }
-
-   private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException
{
-      final AtomicBoolean done = new AtomicBoolean(true);
-      final AtomicBoolean keepGoing = new AtomicBoolean(true);
-
-      // Starts an async thread that every time it publishes it sets the done
-      // flag to false.
-      // Once the send starts to block it will not reset the done flag
-      // anymore.
-      new Thread("Fill thread.") {
-         @Override
-         public void run() {
-            Session session = null;
-            try {
-               session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-               MessageProducer producer = session.createProducer(queue);
-               producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-               while (keepGoing.get()) {
-                  done.set(false);
-                  producer.send(session.createTextMessage("Hello World"));
-               }
-            } catch (JMSException e) {
-            } finally {
-               safeClose(session);
-            }
-         }
-      }.start();
-
-      waitForBlockedOrResourceLimit(done);
-      keepGoing.set(false);
-   }
-
-   protected void waitForBlockedOrResourceLimit(final AtomicBoolean done) throws InterruptedException
{
-      while (true) {
-         Thread.sleep(1000);
-         // the producer is blocked once the done flag stays true or there is a resource
exception
-         if (done.get() || gotResourceException.get()) {
-            break;
-         }
-         done.set(true);
-      }
-   }
-
-   private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws
JMSException {
-      final CountDownLatch done = new CountDownLatch(1);
-      new Thread("Send thread.") {
-         @Override
-         public void run() {
-            Session session = null;
-            try {
-               session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-               MessageProducer producer = session.createProducer(queue);
-               producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-               producer.send(session.createTextMessage(message));
-               done.countDown();
-            } catch (JMSException e) {
-            } finally {
-               safeClose(session);
-            }
-         }
-      }.start();
-      return done;
-   }
-
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      BrokerService service = new BrokerService();
-      service.setPersistent(false);
-      service.setUseJmx(false);
-
-      // Setup a destination policy where it takes only 1 message at a time.
-      PolicyMap policyMap = new PolicyMap();
-      PolicyEntry policy = new PolicyEntry();
-      policy.setMemoryLimit(1);
-      policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
-      policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
-      policy.setProducerFlowControl(true);
-      policyMap.setDefaultEntry(policy);
-      service.setDestinationPolicy(policyMap);
-
-      connector = service.addConnector("tcp://localhost:0");
-      return service;
-   }
-
-   @Override
-   public void setUp() throws Exception {
-      setAutoFail(true);
-      super.setUp();
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      if (connection != null) {
-         TcpTransport t = connection.getTransport().narrow(TcpTransport.class);
-         t.getTransportListener().onException(new IOException("Disposed."));
-         connection.getTransport().stop();
-      }
-      super.tearDown();
-   }
-
-   @Override
-   protected ConnectionFactory createConnectionFactory() throws Exception {
-      return new ActiveMQConnectionFactory(connector.getConnectUri());
-   }
-}


Mime
View raw message