activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] branch master updated: ARTEMIS-2755 Improve SoakPagingTest stability
Date Sun, 10 May 2020 03:45:04 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 74598c8  ARTEMIS-2755 Improve SoakPagingTest stability
     new 56f2654  This closes #3123
74598c8 is described below

commit 74598c88df49c76da10ef6022d15ba66b5ef8349
Author: brusdev <bruscinodf@gmail.com>
AuthorDate: Fri May 8 20:28:14 2020 +0200

    ARTEMIS-2755 Improve SoakPagingTest stability
    
    Await producers and consumer creation before to await the test running time.
---
 .../smoke/replicationflow/SoakPagingTest.java      | 53 +++++++++++++++-------
 1 file changed, 36 insertions(+), 17 deletions(-)

diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java
index 521cce6..b09388b 100644
--- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java
+++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java
@@ -27,15 +27,14 @@ import javax.jms.Session;
 import javax.jms.Topic;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
-import org.apache.activemq.artemis.utils.RetryRule;
 import org.apache.activemq.artemis.utils.SpawnedVMSupport;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -43,9 +42,6 @@ import org.junit.runners.Parameterized;
 @RunWith(Parameterized.class)
 public class SoakPagingTest extends SmokeTestBase {
 
-   @Rule
-   public RetryRule retryRule = new RetryRule(1);
-
    public static final int LAG_CONSUMER_TIME = 1000;
    public static final int TIME_RUNNING = 4000;
    public static final int CLIENT_KILLS = 2;
@@ -91,7 +87,8 @@ public class SoakPagingTest extends SmokeTestBase {
 
    static final int consumer_threads = 20;
    static final int producer_threads = 20;
-   static AtomicInteger j = new AtomicInteger(0);
+   static AtomicInteger producer_count = new AtomicInteger(0);
+   static AtomicInteger consumer_count = new AtomicInteger(0);
 
    private static ConnectionFactory createConnectionFactory(String protocol, String uri)
{
       if (protocol.toUpperCase().equals("OPENWIRE")) {
@@ -132,12 +129,15 @@ public class SoakPagingTest extends SmokeTestBase {
 
          final ConnectionFactory factory = createConnectionFactory(protocol, "tcp://" + host
+ ":" + port);
 
+         CountDownLatch producersLatch = new CountDownLatch(producer_threads);
+         CountDownLatch consumersLatch = new CountDownLatch(consumer_threads);
+
          for (int i = 0; i < producer_threads; i++) {
             Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                   SoakPagingTest app = new SoakPagingTest(protocol, consumerType, tx);
-                  app.produce(factory);
+                  app.produce(factory, producer_count.incrementAndGet(), producersLatch);
                }
             });
             t.start();
@@ -150,16 +150,27 @@ public class SoakPagingTest extends SmokeTestBase {
                @Override
                public void run() {
                   SoakPagingTest app = new SoakPagingTest(protocol, consumerType, tx);
-                  app.consume(factory, j.getAndIncrement());
+                  app.consume(factory, consumer_count.getAndIncrement(), consumersLatch);
                }
             });
             t.start();
          }
+
+         System.out.println("Awaiting producers...");
+         producersLatch.await();
+
+         System.out.println("Awaiting consumers...");
+         consumersLatch.await();
+
+         System.out.println("Awaiting timeout...");
          Thread.sleep(time);
 
-         System.exit(consumed.get() > 0 ? 1 : 0);
-      } catch (Throwable e) {
-         e.printStackTrace();
+         int exitStatus = consumed.get() > 0 ? 1 : 0;
+         System.out.println("Exiting with the status: " + exitStatus);
+         System.exit(exitStatus);
+      } catch (Throwable t) {
+         System.err.println("Exiting with the status 0. Reason: " + t);
+         t.printStackTrace();
          System.exit(0);
       }
 
@@ -178,7 +189,7 @@ public class SoakPagingTest extends SmokeTestBase {
       }
    }
 
-   public void produce(ConnectionFactory factory) {
+   public void produce(ConnectionFactory factory, int index, CountDownLatch latch) {
       try {
 
          StringBuffer bufferlarge = new StringBuffer();
@@ -187,7 +198,11 @@ public class SoakPagingTest extends SmokeTestBase {
          }
          Connection connection = factory.createConnection("admin", "admin");
 
+         latch.countDown();
+
          connection.start();
+         System.out.println("Producer" + index + " started");
+
          final Session session;
 
          if (transaction) {
@@ -220,18 +235,19 @@ public class SoakPagingTest extends SmokeTestBase {
             produced.incrementAndGet();
             i++;
             if (i % 100 == 0) {
-               System.out.println("Published " + i + " messages");
+               System.out.println("Producer" + index + " published " + i + " messages");
                if (transaction) {
                   session.commit();
                }
             }
          }
       } catch (Exception e) {
+         System.err.println("Error on Producer" + index + ": " + e.getMessage());
          e.printStackTrace();
       }
    }
 
-   public void consume(ConnectionFactory factory, int j) {
+   public void consume(ConnectionFactory factory, int index, CountDownLatch latch) {
       try {
          Connection connection = factory.createConnection("admin", "admin");
 
@@ -251,7 +267,7 @@ public class SoakPagingTest extends SmokeTestBase {
             address = session.createTopic(destination);
          }
 
-         String consumerId = "ss" + (j % 5);
+         String consumerId = "ss" + (index % 5);
          MessageConsumer messageConsumer;
 
          if (protocol.equals("shared")) {
@@ -262,23 +278,26 @@ public class SoakPagingTest extends SmokeTestBase {
 
          if (LAG_CONSUMER_TIME > 0) Thread.sleep(LAG_CONSUMER_TIME);
 
+         latch.countDown();
          connection.start();
+         System.out.println("Consumer" + index + " started");
 
          int i = 0;
          while (true) {
             Message m = messageConsumer.receive(1000);
             consumed.incrementAndGet();
             if (m == null)
-               System.out.println("receive() returned null");
+               System.out.println("Consumer" + index + "received null");
             i++;
             if (i % 100 == 0) {
-               System.out.println("Consumed " + i + " messages");
+               System.out.println("Consumer" + index + "received " + i + " messages");
                if (transaction) {
                   session.commit();
                }
             }
          }
       } catch (Exception e) {
+         System.err.println("Error on Consumer" + index + ": " + e.getMessage());
          e.printStackTrace();
       }
    }


Mime
View raw message