Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BA39F200B7C for ; Thu, 25 Aug 2016 00:23:33 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B8DEF160AB1; Wed, 24 Aug 2016 22:23:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 86DC5160AC2 for ; Thu, 25 Aug 2016 00:23:32 +0200 (CEST) Received: (qmail 95886 invoked by uid 500); 24 Aug 2016 22:23:31 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 95874 invoked by uid 99); 24 Aug 2016 22:23:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Aug 2016 22:23:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5A841E04AF; Wed, 24 Aug 2016 22:23:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbertram@apache.org To: commits@activemq.apache.org Date: Wed, 24 Aug 2016 22:23:32 -0000 Message-Id: <5cb1e2dee2d14f8796173c3cec8dbe28@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] activemq-artemis git commit: ARTEMIS-697 Making JChannelManager a singleton, and fixing tests archived-at: Wed, 24 Aug 2016 22:23:33 -0000 ARTEMIS-697 Making JChannelManager a singleton, and fixing tests Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/858d7a1a Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/858d7a1a Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/858d7a1a Branch: refs/heads/master Commit: 858d7a1a02ec18e253a9caba54e8d3cad9f04eed Parents: bf4796c Author: Clebert Suconic Authored: Wed Aug 24 10:31:04 2016 -0400 Committer: Clebert Suconic Committed: Wed Aug 24 18:08:17 2016 -0400 ---------------------------------------------------------------------- .../core/ChannelBroadcastEndpointFactory.java | 28 +--- .../JGroupsFileBroadcastEndpointFactory.java | 2 +- ...roupsPropertiesBroadcastEndpointFactory.java | 2 +- .../api/core/jgroups/JChannelManager.java | 38 ++++- .../api/core/jgroups/JChannelWrapper.java | 20 ++- .../broadcast/JGroupsBroadcastTest.java | 14 ++ .../integration/discovery/DiscoveryTest.java | 156 +++++++++++-------- 7 files changed, 151 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java index af0df2e..66b61d3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java @@ -16,9 +16,6 @@ */ package org.apache.activemq.artemis.api.core; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; import org.jboss.logging.Logger; import org.jgroups.JChannel; @@ -38,32 +35,9 @@ public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory private final JChannelManager manager; - private static final Map managers = new ConcurrentHashMap<>(); + private static final JChannelManager singletonManager = JChannelManager.getInstance(); - private static final JChannelManager singletonManager = new JChannelManager(); -// TODO: To implement this when JForkChannel from JGroups supports multiple channels properly -// -// private static JChannelManager recoverManager(JChannel channel) { -// JChannelManager manager = managers.get(channel); -// if (manager == null) { -// if (logger.isTraceEnabled()) { -// logger.trace("Creating a new JChannelManager for " + channel, new Exception("trace")); -// } -// manager = new JChannelManager(); -// managers.put(channel, manager); -// } -// else { -// if (logger.isTraceEnabled()) { -// logger.trace("Recover an already existent channelManager for " + channel, new Exception("trace")); -// } -// -// } -// -// return manager; -// } -// public ChannelBroadcastEndpointFactory(JChannel channel, String channelName) { - // TODO: use recoverManager(channel) this(singletonManager, channel, channelName); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java index 9f783e7..f560c71 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java @@ -25,7 +25,7 @@ public class JGroupsFileBroadcastEndpointFactory implements BroadcastEndpointFac private String channelName; - private final JChannelManager manager = new JChannelManager(); + private final JChannelManager manager = JChannelManager.getInstance(); @Override public BroadcastEndpoint createBroadcastEndpoint() throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java index 8ed03ab..05867d7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java @@ -24,7 +24,7 @@ public class JGroupsPropertiesBroadcastEndpointFactory implements BroadcastEndpo private String channelName; - private final JChannelManager manager = new JChannelManager(); + private final JChannelManager manager = JChannelManager.getInstance(); @Override public BroadcastEndpoint createBroadcastEndpoint() throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java index 1db4327..682bf76 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java @@ -32,15 +32,43 @@ import org.jboss.logging.Logger; */ public class JChannelManager { - private static final Logger logger = Logger.getLogger(JChannelManager.class); + private static final JChannelManager theInstance = new JChannelManager(); - private static Map channels; + public static JChannelManager getInstance() { + return theInstance; + } + + private JChannelManager() { + } + + public synchronized JChannelManager clear() { + for (JChannelWrapper wrapper : channels.values()) { + wrapper.closeChannel(); + } + channels.clear(); + setLoopbackMessages(false); + return this; + } + + // if true, messages will be loopbacked + // this is useful for testcases using a single channel. + private boolean loopbackMessages = false; + + private final Logger logger = Logger.getLogger(JChannelManager.class); + + private static final Map channels = new HashMap<>(); + + public boolean isLoopbackMessages() { + return loopbackMessages; + } + + public JChannelManager setLoopbackMessages(boolean loopbackMessages) { + this.loopbackMessages = loopbackMessages; + return this; + } public synchronized JChannelWrapper getJChannel(String channelName, JGroupsBroadcastEndpoint endpoint) throws Exception { - if (channels == null) { - channels = new HashMap<>(); - } JChannelWrapper wrapper = channels.get(channelName); if (wrapper == null) { wrapper = new JChannelWrapper(this, channelName, endpoint.createChannel()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java index eb61ffb..e83a33d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java @@ -86,15 +86,21 @@ public class JChannelWrapper { if (logger.isTraceEnabled()) logger.trace(this + "::RefCount-- " + refCount + " on channel " + channelName, new Exception("Trace")); if (refCount == 0) { if (closeWrappedChannel) { - connected = false; - channel.setReceiver(null); - logger.trace(this + "::Closing Channel: " + channelName, new Exception("Trace")); - channel.close(); - manager.removeChannel(channelName); + closeChannel(); } + manager.removeChannel(channelName); } } + public synchronized void closeChannel() { + connected = false; + channel.setReceiver(null); + if (logger.isTraceEnabled()) { + logger.trace(this + "::Closing Channel: " + channelName, new Exception("Trace")); + } + channel.close(); + } + public void removeReceiver(JGroupsReceiver receiver) { if (logger.isTraceEnabled()) logger.trace(this + "::removeReceiver: " + receiver + " on " + channelName, new Exception("Trace")); synchronized (receivers) { @@ -128,7 +134,9 @@ public class JChannelWrapper { public void send(org.jgroups.Message msg) throws Exception { if (logger.isTraceEnabled()) logger.trace(this + "::Sending JGroups Message: Open=" + channel.isOpen() + " on channel " + channelName + " msg=" + msg); - msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK); + if (!manager.isLoopbackMessages()) { + msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK); + } channel.send(msg); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java index 53a6783..5bf36e9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java @@ -19,15 +19,29 @@ package org.apache.activemq.artemis.tests.integration.broadcast; import org.apache.activemq.artemis.api.core.BroadcastEndpoint; import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory; +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule; import org.jgroups.JChannel; import org.jgroups.conf.PlainConfigurator; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; public class JGroupsBroadcastTest { + @After + public void cleanupJChannel() { + JChannelManager.getInstance().clear(); + } + + @Before + public void prepareJChannel() { + JChannelManager.getInstance().setLoopbackMessages(true); + } + + @Rule public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java index 0c667a8..a1faedc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java @@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; +import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.tests.integration.SimpleNotificationService; import org.apache.activemq.artemis.core.cluster.DiscoveryEntry; @@ -42,6 +43,7 @@ import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; /** @@ -70,9 +72,15 @@ public class DiscoveryTest extends DiscoveryBaseTest { BroadcastGroup bg = null, bg1 = null, bg2 = null, bg3 = null; DiscoveryGroup dg = null, dg1 = null, dg2 = null, dg3 = null; + @Before + public void prepareLoopback() { + JChannelManager.getInstance().setLoopbackMessages(true); + } + @Override @After public void tearDown() throws Exception { + JChannelManager.getInstance().clear().setLoopbackMessages(false); /** This file path is defined at {@link #TEST_JGROUPS_CONF_FILE} */ deleteDirectory(new File("./target/tmp/amqtest.ping.dir")); for (ActiveMQComponent component : new ActiveMQComponent[]{bg, bg1, bg2, bg3, dg, dg1, dg2, dg3}) { @@ -140,47 +148,52 @@ public class DiscoveryTest extends DiscoveryBaseTest { BroadcastEndpoint broadcaster = factory.createBroadcastEndpoint(); broadcaster.openBroadcaster(); - int num = 100; - BroadcastEndpoint[] receivers = new BroadcastEndpoint[num]; - for (int i = 0; i < num; i++) { - receivers[i] = factory.createBroadcastEndpoint(); - receivers[i].openClient(); - } + try { - final byte[] data = new byte[]{1, 2, 3, 4, 5}; - broadcaster.broadcast(data); + int num = 100; + BroadcastEndpoint[] receivers = new BroadcastEndpoint[num]; + for (int i = 0; i < num; i++) { + receivers[i] = factory.createBroadcastEndpoint(); + receivers[i].openClient(); + } - for (int i = 0; i < num; i++) { - byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS); - assertNotNull(received); - assertEquals(5, received.length); - assertEquals(1, received[0]); - assertEquals(2, received[1]); - assertEquals(3, received[2]); - assertEquals(4, received[3]); - assertEquals(5, received[4]); - } + final byte[] data = new byte[]{1, 2, 3, 4, 5}; + broadcaster.broadcast(data); + + for (int i = 0; i < num; i++) { + byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS); + assertNotNull(received); + assertEquals(5, received.length); + assertEquals(1, received[0]); + assertEquals(2, received[1]); + assertEquals(3, received[2]); + assertEquals(4, received[3]); + assertEquals(5, received[4]); + } - for (int i = 0; i < num - 1; i++) { - receivers[i].close(false); - } + for (int i = 0; i < num - 1; i++) { + receivers[i].close(false); + } - byte[] data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS); - assertNull(data1); + byte[] data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS); + assertNull(data1); - broadcaster.broadcast(data); - data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS); + broadcaster.broadcast(data); + data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS); - assertNotNull(data1); - assertEquals(5, data1.length); - assertEquals(1, data1[0]); - assertEquals(2, data1[1]); - assertEquals(3, data1[2]); - assertEquals(4, data1[3]); - assertEquals(5, data1[4]); + assertNotNull(data1); + assertEquals(5, data1.length); + assertEquals(1, data1[0]); + assertEquals(2, data1[1]); + assertEquals(3, data1[2]); + assertEquals(4, data1[3]); + assertEquals(5, data1[4]); - receivers[num - 1].close(false); - broadcaster.close(true); + receivers[num - 1].close(false); + } + finally { + broadcaster.close(true); + } } /** @@ -195,7 +208,6 @@ public class DiscoveryTest extends DiscoveryBaseTest { BroadcastEndpointFactory factory = new JGroupsFileBroadcastEndpointFactory().setChannelName("tst").setFile(TEST_JGROUPS_CONF_FILE); BroadcastEndpoint broadcaster = factory.createBroadcastEndpoint(); broadcaster.openBroadcaster(); - int num = 50; BroadcastEndpoint[] receivers = new BroadcastEndpoint[num]; for (int i = 0; i < num; i++) { @@ -203,47 +215,53 @@ public class DiscoveryTest extends DiscoveryBaseTest { receivers[i].openClient(); } - final byte[] data = new byte[]{1, 2, 3, 4, 5}; - broadcaster.broadcast(data); - for (int i = 0; i < num; i++) { - byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS); - assertNotNull(received); - assertEquals(5, received.length); - assertEquals(1, received[0]); - assertEquals(2, received[1]); - assertEquals(3, received[2]); - assertEquals(4, received[3]); - assertEquals(5, received[4]); - } + try { - for (int i = 0; i < num; i++) { - receivers[i].close(false); - } + final byte[] data = new byte[]{1, 2, 3, 4, 5}; + broadcaster.broadcast(data); + + for (int i = 0; i < num; i++) { + byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS); + assertNotNull(received); + assertEquals(5, received.length); + assertEquals(1, received[0]); + assertEquals(2, received[1]); + assertEquals(3, received[2]); + assertEquals(4, received[3]); + assertEquals(5, received[4]); + } - //new ones - for (int i = 0; i < num; i++) { - receivers[i] = factory.createBroadcastEndpoint(); - receivers[i].openClient(); - } + for (int i = 0; i < num; i++) { + receivers[i].close(false); + } - broadcaster.broadcast(data); + //new ones + for (int i = 0; i < num; i++) { + receivers[i] = factory.createBroadcastEndpoint(); + receivers[i].openClient(); + } - for (int i = 0; i < num; i++) { - byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS); - assertNotNull(received); - assertEquals(5, received.length); - assertEquals(1, received[0]); - assertEquals(2, received[1]); - assertEquals(3, received[2]); - assertEquals(4, received[3]); - assertEquals(5, received[4]); - } + broadcaster.broadcast(data); + + for (int i = 0; i < num; i++) { + byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS); + assertNotNull(received); + assertEquals(5, received.length); + assertEquals(1, received[0]); + assertEquals(2, received[1]); + assertEquals(3, received[2]); + assertEquals(4, received[3]); + assertEquals(5, received[4]); + } - for (int i = 0; i < num; i++) { - receivers[i].close(false); } - broadcaster.close(true); + finally { + for (int i = 0; i < num; i++) { + receivers[i].close(false); + } + broadcaster.close(true); + } } /**