activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [1/3] activemq-artemis git commit: NO-JIRA: Fixing test hunging on OpenWire
Date Fri, 31 Mar 2017 14:25:21 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 690b8d24d -> 33fff5265


NO-JIRA: Fixing test hunging on OpenWire


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d779afe8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d779afe8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d779afe8

Branch: refs/heads/master
Commit: d779afe874abdeebbfc893ddbf78c2c6f49d9ef5
Parents: 690b8d2
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Thu Mar 30 21:56:46 2017 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Mar 30 21:56:46 2017 -0400

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   |   9 ++
 .../amq/ProducerFlowControlBaseTest.java        | 158 +++++++++++++++++++
 .../amq/ProducerFlowControlSendFailTest.java    |  62 +++-----
 .../openwire/amq/ProducerFlowControlTest.java   | 128 +--------------
 4 files changed, 190 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d779afe8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 60a8dca..60876b9 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1371,6 +1371,11 @@ public class OpenWireConnection extends AbstractRemotingConnection
implements Se
          session.getCoreSession().resetTX(tx);
          try {
             session.send(producerInfo, messageSend, sendProducerAck);
+         } catch (Exception e) {
+            if (tx != null) {
+               tx.markAsRollbackOnly(new ActiveMQException(e.getMessage()));
+            }
+            throw e;
          } finally {
             session.getCoreSession().resetTX(null);
          }
@@ -1387,6 +1392,10 @@ public class OpenWireConnection extends AbstractRemotingConnection
implements Se
          try {
             AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId());
             consumerBrokerExchange.acknowledge(ack);
+         } catch (Exception e) {
+            if (tx != null) {
+               tx.markAsRollbackOnly(new ActiveMQException(e.getMessage()));
+            }
          } finally {
             session.getCoreSession().resetTX(null);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d779afe8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlBaseTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlBaseTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlBaseTest.java
new file mode 100644
index 0000000..2166201
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlBaseTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.junit.After;
+import org.junit.Before;
+
+public class ProducerFlowControlBaseTest extends BasicOpenWireTest {
+   ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
+   ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
+   protected ActiveMQConnection flowControlConnection;
+   // used to test sendFailIfNoSpace on SystemUsage
+   protected final AtomicBoolean gotResourceException = new AtomicBoolean(false);
+   private Thread asyncThread = null;
+
+
+   protected void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException
{
+      final AtomicBoolean done = new AtomicBoolean(true);
+      final AtomicBoolean keepGoing = new AtomicBoolean(true);
+
+      try {
+         // Starts an async thread that every time it publishes it sets the done
+         // flag to false.
+         // Once the send starts to block it will not reset the done flag
+         // anymore.
+         asyncThread = new Thread("Fill thread.") {
+            @Override
+            public void run() {
+               Session session = null;
+               try {
+                  session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                  MessageProducer producer = session.createProducer(queue);
+                  producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                  while (keepGoing.get()) {
+                     done.set(false);
+                     producer.send(session.createTextMessage("Hello World"));
+                  }
+               } catch (JMSException e) {
+               } finally {
+                  safeClose(session);
+               }
+            }
+         };
+         asyncThread.start();
+
+         waitForBlockedOrResourceLimit(done);
+      } finally {
+         keepGoing.set(false);
+      }
+   }
+
+   protected void waitForBlockedOrResourceLimit(final AtomicBoolean done) throws InterruptedException
{
+      while (true) {
+         Thread.sleep(100);
+         // the producer is blocked once the done flag stays true or there is a
+         // resource exception
+         if (done.get() || gotResourceException.get()) {
+            break;
+         }
+         done.set(true);
+      }
+   }
+
+   protected CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message)
throws JMSException {
+      final CountDownLatch done = new CountDownLatch(1);
+      new Thread("Send thread.") {
+         @Override
+         public void run() {
+            Session session = null;
+            try {
+               session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               MessageProducer producer = session.createProducer(queue);
+               producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+               producer.send(session.createTextMessage(message));
+               done.countDown();
+            } catch (JMSException e) {
+               e.printStackTrace();
+            } finally {
+               safeClose(session);
+            }
+         }
+      }.start();
+      return done;
+   }
+
+   @Override
+   protected void extraServerConfig(Configuration serverConfig) {
+      String match = "#";
+      Map<String, AddressSettings> asMap = serverConfig.getAddressesSettings();
+      asMap.get(match).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      this.makeSureCoreQueueExist("QUEUE.A");
+      this.makeSureCoreQueueExist("QUEUE.B");
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      try {
+         if (flowControlConnection != null) {
+            TcpTransport t = flowControlConnection.getTransport().narrow(TcpTransport.class);
+            try {
+               flowControlConnection.getTransport().stop();
+               flowControlConnection.close();
+            } catch (Throwable ignored) {
+               // sometimes the disposed up can make the test to fail
+               // even worse I have seen this breaking every single test after this
+               // if not caught here
+            }
+            t.getTransportListener().onException(new IOException("Disposed."));
+         }
+         if (asyncThread != null) {
+            asyncThread.join();
+            asyncThread = null;
+         }
+      } finally {
+         super.tearDown();
+      }
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d779afe8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlSendFailTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlSendFailTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlSendFailTest.java
index baacd16..e03ae27 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlSendFailTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlSendFailTest.java
@@ -34,13 +34,14 @@ import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 /**
  * adapted from: org.apache.activemq.ProducerFlowControlSendFailTest
  */
-public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest {
+public class ProducerFlowControlSendFailTest extends ProducerFlowControlBaseTest {
 
    @Override
    @Before
@@ -61,20 +62,8 @@ public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest
{
       asMap.get(match).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
    }
 
-   @Override
-   public void test2ndPublisherWithStandardConnectionThatIsBlocked() throws Exception {
-      // with sendFailIfNoSpace set, there is no blocking of the connection
-   }
-
-   @Override
-   public void testAsyncPublisherRecoverAfterBlock() throws Exception {
-      // sendFail means no flowControllwindow as there is no producer ack, just
-      // an exception
-   }
-
-   @Override
    @Test
-   public void testPublisherRecoverAfterBlock() throws Exception {
+   public void testPublishWithTX() throws Exception {
       ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) getConnectionFactory();
       // with sendFail, there must be no flowControllwindow
       // sendFail is an alternative flow control mechanism that does not block
@@ -82,45 +71,38 @@ public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest
{
       this.flowControlConnection = (ActiveMQConnection) factory.createConnection();
       this.flowControlConnection.start();
 
-      final Session session = this.flowControlConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+      final Session session = this.flowControlConnection.createSession(true, Session.SESSION_TRANSACTED);
       final MessageProducer producer = session.createProducer(queueA);
 
-      final AtomicBoolean keepGoing = new AtomicBoolean(true);
-
-      Thread thread = new Thread("Filler") {
-         @Override
-         public void run() {
-            while (keepGoing.get()) {
-               try {
-                  producer.send(session.createTextMessage("Test message"));
-                  if (gotResourceException.get()) {
-                     System.out.println("got exception");
-                     // do not flood the broker with requests when full as we
-                     // are sending async and they
-                     // will be limited by the network buffers
-                     Thread.sleep(200);
-                  }
-               } catch (Exception e) {
-                  // with async send, there will be no exceptions
-                  e.printStackTrace();
-               }
-            }
+      int successSent = 0;
+      boolean exception = false;
+      try {
+         for (int i = 0; i < 5000; i++) {
+            producer.send(session.createTextMessage("Test message"));
+            session.commit();
+            successSent++;
          }
-      };
-      thread.start();
-      waitForBlockedOrResourceLimit(new AtomicBoolean(false));
+      } catch (Exception e) {
+         exception = true;
+         // with async send, there will be no exceptions
+         e.printStackTrace();
+      }
+
+      Assert.assertTrue(exception);
 
       // resourceException on second message, resumption if we
       // can receive 10
       MessageConsumer consumer = session.createConsumer(queueA);
       TextMessage msg;
-      for (int idx = 0; idx < 10; ++idx) {
+      for (int idx = 0; idx < successSent; ++idx) {
          msg = (TextMessage) consumer.receive(1000);
+         Assert.assertNotNull(msg);
+         System.out.println("Received " + msg);
          if (msg != null) {
             msg.acknowledge();
          }
+         session.commit();
       }
-      keepGoing.set(false);
       consumer.close();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d779afe8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlTest.java
index c085d0f..bde8b79 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlTest.java
@@ -16,40 +16,22 @@
  */
 package org.apache.activemq.artemis.tests.integration.openwire.amq;
 
-import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import java.io.IOException;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.transport.tcp.TcpTransport;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 /**
  * adapted from: org.apache.activemq.ProducerFlowControlTest
  */
-public class ProducerFlowControlTest extends BasicOpenWireTest {
-
-   ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
-   ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
-   protected ActiveMQConnection flowControlConnection;
-   // used to test sendFailIfNoSpace on SystemUsage
-   protected final AtomicBoolean gotResourceException = new AtomicBoolean(false);
-   private Thread asyncThread = null;
+public class ProducerFlowControlTest extends ProducerFlowControlBaseTest {
 
    @Test
    public void test2ndPublisherWithProducerWindowSendConnectionThatIsBlocked() throws Exception
{
@@ -247,112 +229,4 @@ public class ProducerFlowControlTest extends BasicOpenWireTest {
       CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
       assertFalse(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
    }
-
-   private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException
{
-      final AtomicBoolean done = new AtomicBoolean(true);
-      final AtomicBoolean keepGoing = new AtomicBoolean(true);
-
-      // Starts an async thread that every time it publishes it sets the done
-      // flag to false.
-      // Once the send starts to block it will not reset the done flag
-      // anymore.
-      asyncThread = new Thread("Fill thread.") {
-         @Override
-         public void run() {
-            Session session = null;
-            try {
-               session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-               MessageProducer producer = session.createProducer(queue);
-               producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-               while (keepGoing.get()) {
-                  done.set(false);
-                  producer.send(session.createTextMessage("Hello World"));
-               }
-            } catch (JMSException e) {
-            } finally {
-               safeClose(session);
-            }
-         }
-      };
-      asyncThread.start();
-
-      waitForBlockedOrResourceLimit(done);
-      keepGoing.set(false);
-   }
-
-   protected void waitForBlockedOrResourceLimit(final AtomicBoolean done) throws InterruptedException
{
-      while (true) {
-         Thread.sleep(100);
-         System.out.println("check done: " + done.get() + " ex: " + gotResourceException.get());
-         // the producer is blocked once the done flag stays true or there is a
-         // resource exception
-         if (done.get() || gotResourceException.get()) {
-            break;
-         }
-         done.set(true);
-      }
-   }
-
-   private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws
JMSException {
-      final CountDownLatch done = new CountDownLatch(1);
-      new Thread("Send thread.") {
-         @Override
-         public void run() {
-            Session session = null;
-            try {
-               session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-               MessageProducer producer = session.createProducer(queue);
-               producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-               producer.send(session.createTextMessage(message));
-               done.countDown();
-            } catch (JMSException e) {
-               e.printStackTrace();
-            } finally {
-               safeClose(session);
-            }
-         }
-      }.start();
-      return done;
-   }
-
-   @Override
-   protected void extraServerConfig(Configuration serverConfig) {
-      String match = "#";
-      Map<String, AddressSettings> asMap = serverConfig.getAddressesSettings();
-      asMap.get(match).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
-   }
-
-   @Override
-   @Before
-   public void setUp() throws Exception {
-      super.setUp();
-      this.makeSureCoreQueueExist("QUEUE.A");
-      this.makeSureCoreQueueExist("QUEUE.B");
-   }
-
-   @Override
-   @After
-   public void tearDown() throws Exception {
-      try {
-         if (flowControlConnection != null) {
-            TcpTransport t = flowControlConnection.getTransport().narrow(TcpTransport.class);
-            try {
-               flowControlConnection.getTransport().stop();
-               flowControlConnection.close();
-            } catch (Throwable ignored) {
-               // sometimes the disposed up can make the test to fail
-               // even worse I have seen this breaking every single test after this
-               // if not caught here
-            }
-            t.getTransportListener().onException(new IOException("Disposed."));
-         }
-         if (asyncThread != null) {
-            asyncThread.join();
-            asyncThread = null;
-         }
-      } finally {
-         super.tearDown();
-      }
-   }
-
 }


Mime
View raw message