From commits-return-60060-archive-asf-public=cust-asf.ponee.io@activemq.apache.org Wed Apr 29 22:01:09 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 2CC18180654 for ; Thu, 30 Apr 2020 00:01:09 +0200 (CEST) Received: (qmail 50055 invoked by uid 500); 29 Apr 2020 22:01:08 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 50044 invoked by uid 99); 29 Apr 2020 22:01:08 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Apr 2020 22:01:08 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 67F67890AF; Wed, 29 Apr 2020 22:01:08 +0000 (UTC) Date: Wed, 29 Apr 2020 22:01:08 +0000 To: "commits@activemq.apache.org" Subject: [activemq-artemis] branch master updated: ARTEMIS-2743 Synchronize JMS connection methods MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <158819766807.32344.17318640853732545570@gitbox.apache.org> From: clebertsuconic@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: activemq-artemis X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: d0bc946ecb8768abd435f34cc07d8a8a0ddabfcf X-Git-Newrev: 803ccf72292d7ad6202f500023845adb7aad503b X-Git-Rev: 803ccf72292d7ad6202f500023845adb7aad503b X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git The following commit(s) were added to refs/heads/master by this push: new 803ccf7 ARTEMIS-2743 Synchronize JMS connection methods new 38de1f7 This closes #3106 803ccf7 is described below commit 803ccf72292d7ad6202f500023845adb7aad503b Author: Justin Bertram AuthorDate: Wed Apr 29 13:45:36 2020 -0500 ARTEMIS-2743 Synchronize JMS connection methods --- .../artemis/jms/client/ActiveMQConnection.java | 14 +++--- .../artemis/jms/client/ActiveMQXAConnection.java | 6 +-- .../integration/jms/client/ConnectionTest.java | 58 ++++++++++++++++++++++ 3 files changed, 68 insertions(+), 10 deletions(-) diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java index 9241ddf..b3026ad 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java @@ -192,7 +192,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme * For that reason we have this method to force that nonXASession, since the JMS Javadoc * mandates createSession to return a XASession. */ - public Session createNonXASession(final boolean transacted, final int acknowledgeMode) throws JMSException { + public synchronized Session createNonXASession(final boolean transacted, final int acknowledgeMode) throws JMSException { checkClosed(); return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_GENERIC_CONNECTION); @@ -206,7 +206,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme * For that reason we have this method to force that nonXASession, since the JMS Javadoc * mandates createSession to return a XASession. */ - public Session createNonXATopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException { + public synchronized Session createNonXATopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException { checkClosed(); return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_TOPIC_CONNECTION); @@ -220,7 +220,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme * For that reason we have this method to force that nonXASession, since the JMS Javadoc * mandates createSession to return a XASession. */ - public Session createNonXAQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException { + public synchronized Session createNonXAQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException { checkClosed(); return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_QUEUE_CONNECTION); @@ -432,14 +432,14 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme } @Override - public Session createSession(int sessionMode) throws JMSException { + public synchronized Session createSession(int sessionMode) throws JMSException { checkClosed(); return createSessionInternal(false, sessionMode == Session.SESSION_TRANSACTED, sessionMode, ActiveMQSession.TYPE_GENERIC_SESSION); } @Override - public Session createSession() throws JMSException { + public synchronized Session createSession() throws JMSException { checkClosed(); return createSessionInternal(false, false, Session.AUTO_ACKNOWLEDGE, ActiveMQSession.TYPE_GENERIC_SESSION); } @@ -447,7 +447,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme // QueueConnection implementation --------------------------------------------------------------- @Override - public QueueSession createQueueSession(final boolean transacted, int acknowledgeMode) throws JMSException { + public synchronized QueueSession createQueueSession(final boolean transacted, int acknowledgeMode) throws JMSException { checkClosed(); return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), ActiveMQSession.TYPE_QUEUE_SESSION); } @@ -477,7 +477,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme // TopicConnection implementation --------------------------------------------------------------- @Override - public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException { + public synchronized TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException { checkClosed(); return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), ActiveMQSession.TYPE_TOPIC_SESSION); } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java index 0d6158e..c5eea98 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java @@ -48,20 +48,20 @@ public final class ActiveMQXAConnection extends ActiveMQConnection implements XA } @Override - public XASession createXASession() throws JMSException { + public synchronized XASession createXASession() throws JMSException { checkClosed(); return (XASession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_GENERIC_SESSION); } @Override - public XAQueueSession createXAQueueSession() throws JMSException { + public synchronized XAQueueSession createXAQueueSession() throws JMSException { checkClosed(); return (XAQueueSession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_QUEUE_SESSION); } @Override - public XATopicSession createXATopicSession() throws JMSException { + public synchronized XATopicSession createXATopicSession() throws JMSException { checkClosed(); return (XATopicSession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_TOPIC_SESSION); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java index 259162e..d1ed5ba 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java @@ -20,6 +20,7 @@ import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.InvalidClientIDException; import javax.jms.JMSContext; +import javax.jms.JMSException; import javax.jms.QueueConnection; import javax.jms.QueueSession; import javax.jms.Session; @@ -31,15 +32,23 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.jboss.logging.Logger; import org.junit.After; import org.junit.Assert; import org.junit.Test; public class ConnectionTest extends JMSTestBase { + private static final Logger log = Logger.getLogger(ConnectionTest.class); + private Connection conn2; @Test @@ -248,6 +257,55 @@ public class ConnectionTest extends JMSTestBase { } } + @Test + public void testCreateSessionAndCloseConnectionConcurrently() throws Exception { + final int ATTEMPTS = 10; + final int THREAD_COUNT = 50; + final int SESSION_COUNT = 10; + final ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); + try { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); + + for (int i = 0; i < ATTEMPTS; i++) { + final CountDownLatch lineUp = new CountDownLatch(THREAD_COUNT); + final AtomicBoolean error = new AtomicBoolean(false); + final Connection connection = cf.createConnection(); + + for (int j = 0; j < THREAD_COUNT; ++j) { + executor.execute(() -> { + for (int k = 0; k < SESSION_COUNT; k++) { + try { + connection.createSession().close(); + if (k == 0) { + lineUp.countDown(); + } + } catch (javax.jms.IllegalStateException e) { + // ignore + break; + } catch (JMSException e) { + // ignore + break; + } catch (Throwable t) { + log.warn(t.getMessage(), t); + error.set(true); + break; + } + } + }); + } + + // wait until all the threads have created & closed at least 1 session + assertTrue(lineUp.await(10, TimeUnit.SECONDS)); + connection.close(); + if (error.get()) { + assertFalse(error.get()); + } + } + } finally { + executor.shutdownNow(); + } + } + @Override @After public void tearDown() throws Exception {