activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] branch master updated: ARTEMIS-2743 Synchronize JMS connection methods
Date Wed, 29 Apr 2020 22:01:08 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 803ccf7  ARTEMIS-2743 Synchronize JMS connection methods
     new 38de1f7  This closes #3106
803ccf7 is described below

commit 803ccf72292d7ad6202f500023845adb7aad503b
Author: Justin Bertram <jbertram@apache.org>
AuthorDate: Wed Apr 29 13:45:36 2020 -0500

    ARTEMIS-2743 Synchronize JMS connection methods
---
 .../artemis/jms/client/ActiveMQConnection.java     | 14 +++---
 .../artemis/jms/client/ActiveMQXAConnection.java   |  6 +--
 .../integration/jms/client/ConnectionTest.java     | 58 ++++++++++++++++++++++
 3 files changed, 68 insertions(+), 10 deletions(-)

diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
index 9241ddf..b3026ad 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
@@ -192,7 +192,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl
impleme
     * For that reason we have this method to force that nonXASession, since the JMS Javadoc
     * mandates createSession to return a XASession.
     */
-   public Session createNonXASession(final boolean transacted, final int acknowledgeMode)
throws JMSException {
+   public synchronized Session createNonXASession(final boolean transacted, final int acknowledgeMode)
throws JMSException {
       checkClosed();
 
       return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_GENERIC_CONNECTION);
@@ -206,7 +206,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl
impleme
     * For that reason we have this method to force that nonXASession, since the JMS Javadoc
     * mandates createSession to return a XASession.
     */
-   public Session createNonXATopicSession(final boolean transacted, final int acknowledgeMode)
throws JMSException {
+   public synchronized Session createNonXATopicSession(final boolean transacted, final int
acknowledgeMode) throws JMSException {
       checkClosed();
 
       return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_TOPIC_CONNECTION);
@@ -220,7 +220,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl
impleme
     * For that reason we have this method to force that nonXASession, since the JMS Javadoc
     * mandates createSession to return a XASession.
     */
-   public Session createNonXAQueueSession(final boolean transacted, final int acknowledgeMode)
throws JMSException {
+   public synchronized Session createNonXAQueueSession(final boolean transacted, final int
acknowledgeMode) throws JMSException {
       checkClosed();
 
       return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_QUEUE_CONNECTION);
@@ -432,14 +432,14 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl
impleme
    }
 
    @Override
-   public Session createSession(int sessionMode) throws JMSException {
+   public synchronized Session createSession(int sessionMode) throws JMSException {
       checkClosed();
       return createSessionInternal(false, sessionMode == Session.SESSION_TRANSACTED, sessionMode,
ActiveMQSession.TYPE_GENERIC_SESSION);
 
    }
 
    @Override
-   public Session createSession() throws JMSException {
+   public synchronized Session createSession() throws JMSException {
       checkClosed();
       return createSessionInternal(false, false, Session.AUTO_ACKNOWLEDGE, ActiveMQSession.TYPE_GENERIC_SESSION);
    }
@@ -447,7 +447,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl
impleme
    // QueueConnection implementation ---------------------------------------------------------------
 
    @Override
-   public QueueSession createQueueSession(final boolean transacted, int acknowledgeMode)
throws JMSException {
+   public synchronized QueueSession createQueueSession(final boolean transacted, int acknowledgeMode)
throws JMSException {
       checkClosed();
       return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode),
ActiveMQSession.TYPE_QUEUE_SESSION);
    }
@@ -477,7 +477,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl
impleme
    // TopicConnection implementation ---------------------------------------------------------------
 
    @Override
-   public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode)
throws JMSException {
+   public synchronized TopicSession createTopicSession(final boolean transacted, final int
acknowledgeMode) throws JMSException {
       checkClosed();
       return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode),
ActiveMQSession.TYPE_TOPIC_SESSION);
    }
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java
index 0d6158e..c5eea98 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java
@@ -48,20 +48,20 @@ public final class ActiveMQXAConnection extends ActiveMQConnection implements
XA
    }
 
    @Override
-   public XASession createXASession() throws JMSException {
+   public synchronized XASession createXASession() throws JMSException {
       checkClosed();
       return (XASession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED,
ActiveMQSession.TYPE_GENERIC_SESSION);
    }
 
    @Override
-   public XAQueueSession createXAQueueSession() throws JMSException {
+   public synchronized XAQueueSession createXAQueueSession() throws JMSException {
       checkClosed();
       return (XAQueueSession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED,
ActiveMQSession.TYPE_QUEUE_SESSION);
 
    }
 
    @Override
-   public XATopicSession createXATopicSession() throws JMSException {
+   public synchronized XATopicSession createXATopicSession() throws JMSException {
       checkClosed();
       return (XATopicSession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED,
ActiveMQSession.TYPE_TOPIC_SESSION);
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java
index 259162e..d1ed5ba 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java
@@ -20,6 +20,7 @@ import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.InvalidClientIDException;
 import javax.jms.JMSContext;
+import javax.jms.JMSException;
 import javax.jms.QueueConnection;
 import javax.jms.QueueSession;
 import javax.jms.Session;
@@ -31,15 +32,23 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.jboss.logging.Logger;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class ConnectionTest extends JMSTestBase {
 
+   private static final Logger log = Logger.getLogger(ConnectionTest.class);
+
    private Connection conn2;
 
    @Test
@@ -248,6 +257,55 @@ public class ConnectionTest extends JMSTestBase {
       }
    }
 
+   @Test
+   public void testCreateSessionAndCloseConnectionConcurrently() throws Exception {
+      final int ATTEMPTS = 10;
+      final int THREAD_COUNT = 50;
+      final int SESSION_COUNT = 10;
+      final ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
+      try {
+         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
+
+         for (int i = 0; i < ATTEMPTS; i++) {
+            final CountDownLatch lineUp = new CountDownLatch(THREAD_COUNT);
+            final AtomicBoolean error = new AtomicBoolean(false);
+            final Connection connection = cf.createConnection();
+
+            for (int j = 0; j < THREAD_COUNT; ++j) {
+               executor.execute(() -> {
+                  for (int k = 0; k < SESSION_COUNT; k++) {
+                     try {
+                        connection.createSession().close();
+                        if (k == 0) {
+                           lineUp.countDown();
+                        }
+                     } catch (javax.jms.IllegalStateException e) {
+                        // ignore
+                        break;
+                     } catch (JMSException e) {
+                        // ignore
+                        break;
+                     } catch (Throwable t) {
+                        log.warn(t.getMessage(), t);
+                        error.set(true);
+                        break;
+                     }
+                  }
+               });
+            }
+
+            // wait until all the threads have created & closed at least 1 session
+            assertTrue(lineUp.await(10, TimeUnit.SECONDS));
+            connection.close();
+            if (error.get()) {
+               assertFalse(error.get());
+            }
+         }
+      } finally {
+         executor.shutdownNow();
+      }
+   }
+
    @Override
    @After
    public void tearDown() throws Exception {


Mime
View raw message