activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [2/3] activemq-artemis git commit: ARTEMIS-697 Making JChannelManager a singleton, and fixing tests
Date Wed, 24 Aug 2016 22:23:32 GMT
ARTEMIS-697 Making JChannelManager a singleton, and fixing tests


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/858d7a1a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/858d7a1a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/858d7a1a

Branch: refs/heads/master
Commit: 858d7a1a02ec18e253a9caba54e8d3cad9f04eed
Parents: bf4796c
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Wed Aug 24 10:31:04 2016 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Aug 24 18:08:17 2016 -0400

----------------------------------------------------------------------
 .../core/ChannelBroadcastEndpointFactory.java   |  28 +---
 .../JGroupsFileBroadcastEndpointFactory.java    |   2 +-
 ...roupsPropertiesBroadcastEndpointFactory.java |   2 +-
 .../api/core/jgroups/JChannelManager.java       |  38 ++++-
 .../api/core/jgroups/JChannelWrapper.java       |  20 ++-
 .../broadcast/JGroupsBroadcastTest.java         |  14 ++
 .../integration/discovery/DiscoveryTest.java    | 156 +++++++++++--------
 7 files changed, 151 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
index af0df2e..66b61d3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
@@ -16,9 +16,6 @@
  */
 package org.apache.activemq.artemis.api.core;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
 import org.jboss.logging.Logger;
 import org.jgroups.JChannel;
@@ -38,32 +35,9 @@ public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory
 
    private final JChannelManager manager;
 
-   private static final Map<JChannel, JChannelManager> managers = new ConcurrentHashMap<>();
+   private static final JChannelManager singletonManager = JChannelManager.getInstance();
 
-   private static final JChannelManager singletonManager = new JChannelManager();
-//  TODO: To implement this when JForkChannel from JGroups supports multiple channels properly
-//
-//   private static JChannelManager recoverManager(JChannel channel) {
-//      JChannelManager manager = managers.get(channel);
-//      if (manager == null) {
-//         if (logger.isTraceEnabled()) {
-//            logger.trace("Creating a new JChannelManager for " + channel, new Exception("trace"));
-//         }
-//         manager = new JChannelManager();
-//         managers.put(channel, manager);
-//      }
-//      else {
-//         if (logger.isTraceEnabled()) {
-//            logger.trace("Recover an already existent channelManager for " + channel, new
Exception("trace"));
-//         }
-//
-//      }
-//
-//      return manager;
-//   }
-//
    public ChannelBroadcastEndpointFactory(JChannel channel, String channelName) {
-      // TODO: use recoverManager(channel)
       this(singletonManager, channel, channelName);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
index 9f783e7..f560c71 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
@@ -25,7 +25,7 @@ public class JGroupsFileBroadcastEndpointFactory implements BroadcastEndpointFac
 
    private String channelName;
 
-   private final JChannelManager manager = new JChannelManager();
+   private final JChannelManager manager = JChannelManager.getInstance();
 
    @Override
    public BroadcastEndpoint createBroadcastEndpoint() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
index 8ed03ab..05867d7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
@@ -24,7 +24,7 @@ public class JGroupsPropertiesBroadcastEndpointFactory implements BroadcastEndpo
 
    private String channelName;
 
-   private final JChannelManager manager = new JChannelManager();
+   private final JChannelManager manager = JChannelManager.getInstance();
 
    @Override
    public BroadcastEndpoint createBroadcastEndpoint() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
index 1db4327..682bf76 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
@@ -32,15 +32,43 @@ import org.jboss.logging.Logger;
  */
 public class JChannelManager {
 
-   private static final Logger logger = Logger.getLogger(JChannelManager.class);
+   private static final JChannelManager theInstance = new JChannelManager();
 
-   private static Map<String, JChannelWrapper> channels;
+   public static JChannelManager getInstance() {
+      return theInstance;
+   }
+
+   private JChannelManager() {
+   }
+
+   public synchronized JChannelManager clear() {
+      for (JChannelWrapper wrapper : channels.values()) {
+         wrapper.closeChannel();
+      }
+      channels.clear();
+      setLoopbackMessages(false);
+      return this;
+   }
+
+   // if true, messages will be loopbacked
+   // this is useful for testcases using a single channel.
+   private boolean loopbackMessages = false;
+
+   private final Logger logger = Logger.getLogger(JChannelManager.class);
+
+   private static final Map<String, JChannelWrapper> channels = new HashMap<>();
+
+   public boolean isLoopbackMessages() {
+      return loopbackMessages;
+   }
+
+   public JChannelManager setLoopbackMessages(boolean loopbackMessages) {
+      this.loopbackMessages = loopbackMessages;
+      return this;
+   }
 
    public synchronized JChannelWrapper getJChannel(String channelName,
                                                    JGroupsBroadcastEndpoint endpoint) throws
Exception {
-      if (channels == null) {
-         channels = new HashMap<>();
-      }
       JChannelWrapper wrapper = channels.get(channelName);
       if (wrapper == null) {
          wrapper = new JChannelWrapper(this, channelName, endpoint.createChannel());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
index eb61ffb..e83a33d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
@@ -86,15 +86,21 @@ public class JChannelWrapper {
       if (logger.isTraceEnabled()) logger.trace(this + "::RefCount-- " + refCount + " on
channel " + channelName, new Exception("Trace"));
       if (refCount == 0) {
          if (closeWrappedChannel) {
-            connected = false;
-            channel.setReceiver(null);
-            logger.trace(this + "::Closing Channel: " + channelName, new Exception("Trace"));
-            channel.close();
-            manager.removeChannel(channelName);
+            closeChannel();
          }
+         manager.removeChannel(channelName);
       }
    }
 
+   public synchronized void closeChannel() {
+      connected = false;
+      channel.setReceiver(null);
+      if (logger.isTraceEnabled()) {
+         logger.trace(this + "::Closing Channel: " + channelName, new Exception("Trace"));
+      }
+      channel.close();
+   }
+
    public void removeReceiver(JGroupsReceiver receiver) {
       if (logger.isTraceEnabled()) logger.trace(this + "::removeReceiver: " + receiver +
" on "  + channelName, new Exception("Trace"));
       synchronized (receivers) {
@@ -128,7 +134,9 @@ public class JChannelWrapper {
 
    public void send(org.jgroups.Message msg) throws Exception {
       if (logger.isTraceEnabled()) logger.trace(this + "::Sending JGroups Message: Open="
+ channel.isOpen() + " on channel " + channelName + " msg=" + msg);
-      msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
+      if (!manager.isLoopbackMessages()) {
+         msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
+      }
       channel.send(msg);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
index 53a6783..5bf36e9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
@@ -19,15 +19,29 @@ package org.apache.activemq.artemis.tests.integration.broadcast;
 import org.apache.activemq.artemis.api.core.BroadcastEndpoint;
 import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
 import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
+import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
 import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
 import org.jgroups.JChannel;
 import org.jgroups.conf.PlainConfigurator;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
 public class JGroupsBroadcastTest {
 
+   @After
+   public void cleanupJChannel() {
+      JChannelManager.getInstance().clear();
+   }
+
+   @Before
+   public void prepareJChannel() {
+      JChannelManager.getInstance().setLoopbackMessages(true);
+   }
+
+
    @Rule
    public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java
index 0c667a8..a1faedc 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
+import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.tests.integration.SimpleNotificationService;
 import org.apache.activemq.artemis.core.cluster.DiscoveryEntry;
@@ -42,6 +43,7 @@ import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 /**
@@ -70,9 +72,15 @@ public class DiscoveryTest extends DiscoveryBaseTest {
    BroadcastGroup bg = null, bg1 = null, bg2 = null, bg3 = null;
    DiscoveryGroup dg = null, dg1 = null, dg2 = null, dg3 = null;
 
+   @Before
+   public void prepareLoopback() {
+      JChannelManager.getInstance().setLoopbackMessages(true);
+   }
+
    @Override
    @After
    public void tearDown() throws Exception {
+      JChannelManager.getInstance().clear().setLoopbackMessages(false);
       /** This file path is defined at {@link #TEST_JGROUPS_CONF_FILE} */
       deleteDirectory(new File("./target/tmp/amqtest.ping.dir"));
       for (ActiveMQComponent component : new ActiveMQComponent[]{bg, bg1, bg2, bg3, dg, dg1,
dg2, dg3}) {
@@ -140,47 +148,52 @@ public class DiscoveryTest extends DiscoveryBaseTest {
       BroadcastEndpoint broadcaster = factory.createBroadcastEndpoint();
       broadcaster.openBroadcaster();
 
-      int num = 100;
-      BroadcastEndpoint[] receivers = new BroadcastEndpoint[num];
-      for (int i = 0; i < num; i++) {
-         receivers[i] = factory.createBroadcastEndpoint();
-         receivers[i].openClient();
-      }
+      try {
 
-      final byte[] data = new byte[]{1, 2, 3, 4, 5};
-      broadcaster.broadcast(data);
+         int num = 100;
+         BroadcastEndpoint[] receivers = new BroadcastEndpoint[num];
+         for (int i = 0; i < num; i++) {
+            receivers[i] = factory.createBroadcastEndpoint();
+            receivers[i].openClient();
+         }
 
-      for (int i = 0; i < num; i++) {
-         byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS);
-         assertNotNull(received);
-         assertEquals(5, received.length);
-         assertEquals(1, received[0]);
-         assertEquals(2, received[1]);
-         assertEquals(3, received[2]);
-         assertEquals(4, received[3]);
-         assertEquals(5, received[4]);
-      }
+         final byte[] data = new byte[]{1, 2, 3, 4, 5};
+         broadcaster.broadcast(data);
+
+         for (int i = 0; i < num; i++) {
+            byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS);
+            assertNotNull(received);
+            assertEquals(5, received.length);
+            assertEquals(1, received[0]);
+            assertEquals(2, received[1]);
+            assertEquals(3, received[2]);
+            assertEquals(4, received[3]);
+            assertEquals(5, received[4]);
+         }
 
-      for (int i = 0; i < num - 1; i++) {
-         receivers[i].close(false);
-      }
+         for (int i = 0; i < num - 1; i++) {
+            receivers[i].close(false);
+         }
 
-      byte[] data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS);
-      assertNull(data1);
+         byte[] data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS);
+         assertNull(data1);
 
-      broadcaster.broadcast(data);
-      data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS);
+         broadcaster.broadcast(data);
+         data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS);
 
-      assertNotNull(data1);
-      assertEquals(5, data1.length);
-      assertEquals(1, data1[0]);
-      assertEquals(2, data1[1]);
-      assertEquals(3, data1[2]);
-      assertEquals(4, data1[3]);
-      assertEquals(5, data1[4]);
+         assertNotNull(data1);
+         assertEquals(5, data1.length);
+         assertEquals(1, data1[0]);
+         assertEquals(2, data1[1]);
+         assertEquals(3, data1[2]);
+         assertEquals(4, data1[3]);
+         assertEquals(5, data1[4]);
 
-      receivers[num - 1].close(false);
-      broadcaster.close(true);
+         receivers[num - 1].close(false);
+      }
+      finally {
+         broadcaster.close(true);
+      }
    }
 
    /**
@@ -195,7 +208,6 @@ public class DiscoveryTest extends DiscoveryBaseTest {
       BroadcastEndpointFactory factory = new JGroupsFileBroadcastEndpointFactory().setChannelName("tst").setFile(TEST_JGROUPS_CONF_FILE);
       BroadcastEndpoint broadcaster = factory.createBroadcastEndpoint();
       broadcaster.openBroadcaster();
-
       int num = 50;
       BroadcastEndpoint[] receivers = new BroadcastEndpoint[num];
       for (int i = 0; i < num; i++) {
@@ -203,47 +215,53 @@ public class DiscoveryTest extends DiscoveryBaseTest {
          receivers[i].openClient();
       }
 
-      final byte[] data = new byte[]{1, 2, 3, 4, 5};
-      broadcaster.broadcast(data);
 
-      for (int i = 0; i < num; i++) {
-         byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS);
-         assertNotNull(received);
-         assertEquals(5, received.length);
-         assertEquals(1, received[0]);
-         assertEquals(2, received[1]);
-         assertEquals(3, received[2]);
-         assertEquals(4, received[3]);
-         assertEquals(5, received[4]);
-      }
+      try {
 
-      for (int i = 0; i < num; i++) {
-         receivers[i].close(false);
-      }
+         final byte[] data = new byte[]{1, 2, 3, 4, 5};
+         broadcaster.broadcast(data);
+
+         for (int i = 0; i < num; i++) {
+            byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS);
+            assertNotNull(received);
+            assertEquals(5, received.length);
+            assertEquals(1, received[0]);
+            assertEquals(2, received[1]);
+            assertEquals(3, received[2]);
+            assertEquals(4, received[3]);
+            assertEquals(5, received[4]);
+         }
 
-      //new ones
-      for (int i = 0; i < num; i++) {
-         receivers[i] = factory.createBroadcastEndpoint();
-         receivers[i].openClient();
-      }
+         for (int i = 0; i < num; i++) {
+            receivers[i].close(false);
+         }
 
-      broadcaster.broadcast(data);
+         //new ones
+         for (int i = 0; i < num; i++) {
+            receivers[i] = factory.createBroadcastEndpoint();
+            receivers[i].openClient();
+         }
 
-      for (int i = 0; i < num; i++) {
-         byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS);
-         assertNotNull(received);
-         assertEquals(5, received.length);
-         assertEquals(1, received[0]);
-         assertEquals(2, received[1]);
-         assertEquals(3, received[2]);
-         assertEquals(4, received[3]);
-         assertEquals(5, received[4]);
-      }
+         broadcaster.broadcast(data);
+
+         for (int i = 0; i < num; i++) {
+            byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS);
+            assertNotNull(received);
+            assertEquals(5, received.length);
+            assertEquals(1, received[0]);
+            assertEquals(2, received[1]);
+            assertEquals(3, received[2]);
+            assertEquals(4, received[3]);
+            assertEquals(5, received[4]);
+         }
 
-      for (int i = 0; i < num; i++) {
-         receivers[i].close(false);
       }
-      broadcaster.close(true);
+      finally {
+         for (int i = 0; i < num; i++) {
+            receivers[i].close(false);
+         }
+         broadcaster.close(true);
+      }
    }
 
    /**


Mime
View raw message