activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [2/4] activemq-artemis git commit: ARTEMIS-301 - Adding test replicating Consumer::cancel concurrency issue
Date Wed, 11 Nov 2015 16:21:36 GMT
ARTEMIS-301 - Adding test replicating Consumer::cancel concurrency issue


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f0f886f5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f0f886f5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f0f886f5

Branch: refs/heads/master
Commit: f0f886f53af93c3a90c973f8b3ab80dc85efb889
Parents: 74c20ea
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Fri Nov 6 16:27:40 2015 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Nov 10 14:29:34 2015 -0500

----------------------------------------------------------------------
 .../byteman/ConcurrentDeliveryCancelTest.java   | 295 +++++++++++++++++++
 1 file changed, 295 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f0f886f5/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java
b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java
new file mode 100644
index 0000000..c00985f
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.extras.byteman;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.transaction.xa.XAResource;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.client.ActiveMQSession;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/** This test will force two consumers to be waiting on close and introduce a race I saw
in a production system */
+@RunWith(BMUnitRunner.class)
+public class ConcurrentDeliveryCancelTest extends JMSTestBase
+{
+
+   // used to wait the thread to align at the same place and create the race
+   private static final ReusableLatch latchEnter = new ReusableLatch(2);
+   // used to start
+   private static final ReusableLatch latchFlag = new ReusableLatch(1);
+
+   public static void enterCancel()
+   {
+      latchEnter.countDown();
+      try
+      {
+         latchFlag.await();
+      }
+      catch (Exception ignored)
+      {
+      }
+   }
+
+   public static void resetLatches(int numberOfThreads)
+   {
+      latchEnter.setCount(numberOfThreads);
+      latchFlag.setCount(1);
+   }
+
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "enterCancel-holdThere",
+                     targetClass = "org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl",
+                     targetMethod = "close",
+                     targetLocation = "ENTRY",
+                     action = "org.apache.activemq.artemis.tests.extras.byteman.ConcurrentDeliveryCancelTest.enterCancel();"
+                  )
+            }
+      )
+   public void testConcurrentCancels() throws Exception
+   {
+
+      server.getAddressSettingsRepository().clear();
+      AddressSettings settings = new AddressSettings();
+      settings.setMaxDeliveryAttempts(-1);
+      server.getAddressSettingsRepository().addMatch("#", settings);
+      ActiveMQConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory("tcp://localhost:61616",
"test");
+      cf.setReconnectAttempts(0);
+      cf.setRetryInterval(10);
+
+
+      System.out.println(".....");
+      for (ServerSession srvSess : server.getSessions())
+      {
+         System.out.println(srvSess);
+      }
+
+      String queueName = RandomUtil.randomString();
+      Queue queue = createQueue(queueName);
+
+      int numberOfMessages = 10000;
+
+      {
+         Connection connection = cf.createConnection();
+         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         MessageProducer producer = session.createProducer(queue);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            TextMessage msg = session.createTextMessage("message " + i);
+            msg.setIntProperty("i", i);
+            producer.send(msg);
+         }
+         session.commit();
+
+         connection.close();
+      }
+
+      for (int i = 0; i < 100; i++)
+      {
+         XAConnectionFactory xacf = ActiveMQJMSClient.createConnectionFactory("tcp://localhost:61616",
"test");
+
+         final XAConnection connection = xacf.createXAConnection();
+         final XASession theSession = connection.createXASession();
+         ((ActiveMQSession)theSession).getCoreSession().addMetaData("theSession", "true");
+
+         connection.start();
+
+         final MessageConsumer consumer = theSession.createConsumer(queue);
+
+         XidImpl xid =  newXID();
+         theSession.getXAResource().start(xid, XAResource.TMNOFLAGS);
+         theSession.getXAResource().setTransactionTimeout(1); // I'm setting a small timeout
just because I'm lazy to call end myself
+
+         for (int msg = 0; msg < 11; msg++)
+         {
+            Assert.assertNotNull(consumer.receiveNoWait());
+         }
+
+         System.out.println(".....");
+
+         final List<ServerSession> serverSessions = new LinkedList<ServerSession>();
+
+         // We will force now the failure simultaneously from several places
+         for (ServerSession srvSess : server.getSessions())
+         {
+            if (srvSess.getMetaData("theSession") != null)
+            {
+               System.out.println(srvSess);
+               serverSessions.add(srvSess);
+            }
+         }
+
+
+         resetLatches(2); // from Transactional reaper
+
+         List<Thread> threads = new LinkedList<Thread>();
+
+         threads.add(new Thread("ConsumerCloser")
+         {
+            public void run()
+            {
+               try
+               {
+                  System.out.println(Thread.currentThread().getName() + " closing consumer");
+                  consumer.close();
+                  System.out.println(Thread.currentThread().getName() + " closed consumer");
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace();
+               }
+            }
+         });
+
+         threads.add(new Thread("SessionCloser")
+         {
+            public void run()
+            {
+               for (ServerSession sess : serverSessions)
+               {
+                  System.out.println("Thread " + Thread.currentThread().getName() + " starting");
+                  try
+                  {
+                     // A session.close could sneak in through failover or some other scenarios.
+                     // a call to RemotingConnection.fail wasn't replicating the issue.
+                     // I needed to call Session.close() directly to replicate what was happening
in production
+                     sess.close(true);
+                  }
+                  catch (Exception e)
+                  {
+                     e.printStackTrace();
+                  }
+                  System.out.println("Thread " + Thread.currentThread().getName() + " done");
+               }
+            }
+         });
+//
+//         consumer.close();
+//
+//         threads.add(new Thread("ClientFailing")
+//         {
+//            public void run()
+//            {
+//               ClientSessionInternal impl = (ClientSessionInternal) ((HornetQSession)theSession).getCoreSession();
+//               impl.getConnection().fail(new HornetQException("failure"));
+//            }
+//         });
+//
+
+
+         for (Thread t : threads)
+         {
+            t.start();
+         }
+
+         Assert.assertTrue(latchEnter.await(10, TimeUnit.MINUTES));
+         latchFlag.countDown();
+
+         for (Thread t: threads)
+         {
+            t.join(5000);
+            Assert.assertFalse(t.isAlive());
+         }
+
+         connection.close();
+      }
+
+      Connection connection = cf.createConnection();
+      connection.setClientID("myID");
+
+      Thread.sleep(2000); // I am too lazy to call end on all the transactions
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer consumer = session.createConsumer(queue);
+      HashMap<Integer, AtomicInteger> mapCount = new HashMap<Integer, AtomicInteger>();
+
+      while (true)
+      {
+         TextMessage message = (TextMessage)consumer.receiveNoWait();
+         if (message == null)
+         {
+            break;
+         }
+
+         Integer value = message.getIntProperty("i");
+
+         AtomicInteger count = mapCount.get(value);
+         if (count == null)
+         {
+            count = new AtomicInteger(0);
+            mapCount.put(message.getIntProperty("i"), count);
+         }
+
+         count.incrementAndGet();
+      }
+
+      boolean failed = false;
+      for (int i = 0; i < numberOfMessages; i++)
+      {
+         AtomicInteger count = mapCount.get(i);
+         if (count == null)
+         {
+            System.out.println("Message " + i + " not received");
+            failed = true;
+         }
+         else if (count.get() > 1)
+         {
+            System.out.println("Message " + i + " received " + count.get() + " times");
+            failed = true;
+         }
+      }
+
+      Assert.assertFalse("test failed, look at the system.out of the test for more infomration",
failed);
+
+      connection.close();
+
+
+   }
+}


Mime
View raw message