activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/3] activemq-artemis git commit: ARTEMIS-281 - fix reference counting for jgroups channels
Date Tue, 27 Oct 2015 14:03:43 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 7f60ff20a -> 8f3157a5b


ARTEMIS-281 - fix reference counting for jgroups channels

https://issues.apache.org/jira/browse/ARTEMIS-281


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/290cb65b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/290cb65b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/290cb65b

Branch: refs/heads/master
Commit: 290cb65b1733092344fe08b1c10e91a1a20daa30
Parents: 7f60ff2
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Tue Oct 27 08:12:29 2015 +0000
Committer: Andy Taylor <andy.tayls67@gmail.com>
Committed: Tue Oct 27 10:10:52 2015 +0000

----------------------------------------------------------------------
 .../api/core/JGroupsBroadcastEndpoint.java      | 23 +++--
 .../core/JGroupsChannelBroadcastEndpoint.java   |  4 +-
 .../broadcast/JGroupsBroadcastTest.java         | 90 ++++++++++++++++++++
 3 files changed, 110 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/290cb65b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
index df66450..b75531e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
@@ -110,15 +110,16 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint
{
          channel.removeReceiver(receiver);
          clientOpened = false;
       }
-      internalCloseChannel();
+      internalCloseChannel(channel);
    }
 
    /**
     * Closes the channel used in this JGroups Broadcast.
     * Can be overridden by implementations that use an externally managed channel.
+    * @param channel
     */
-   protected synchronized void internalCloseChannel() {
-      channel.close();
+   protected synchronized void internalCloseChannel(JChannelWrapper channel) {
+      channel.close(true);
    }
 
    /**
@@ -161,10 +162,15 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint
{
          this.channel = channel;
       }
 
-      public synchronized void close() {
+      public synchronized void close(boolean closeWrappedChannel) {
          refCount--;
          if (refCount == 0) {
-            JChannelManager.closeChannel(this.channelName, channel);
+            if (closeWrappedChannel) {
+               JChannelManager.closeChannel(this.channelName, channel);
+            }
+            else {
+               JChannelManager.removeChannel(this.channelName);
+            }
          }
       }
 
@@ -246,5 +252,12 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint
{
             throw new IllegalStateException("Did not find channel " + channelName);
          }
       }
+
+      public static void removeChannel(String channelName) {
+         JChannelWrapper wrapper = channels.remove(channelName);
+         if (wrapper == null) {
+            throw new IllegalStateException("Did not find channel " + channelName);
+         }
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/290cb65b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java
index 9793c78..4fbb24c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java
@@ -38,7 +38,7 @@ public class JGroupsChannelBroadcastEndpoint extends JGroupsBroadcastEndpoint
{
    }
 
    @Override
-   protected synchronized void internalCloseChannel() {
-      // no-op, this version takes an externally managed channel.
+   protected synchronized void internalCloseChannel(JChannelWrapper channel) {
+      channel.close(false);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/290cb65b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
new file mode 100644
index 0000000..83b8f28
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.broadcast;
+
+import org.apache.activemq.artemis.api.core.BroadcastEndpoint;
+import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
+import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
+import org.jgroups.JChannel;
+import org.jgroups.conf.PlainConfigurator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JGroupsBroadcastTest {
+
+   private final String jgroupsConfigString = "UDP(oob_thread_pool.max_threads=300;" + "bind_addr=127.0.0.1;oob_thread_pool.keep_alive_time=1000;"
+ "max_bundle_size=31k;mcast_send_buf_size=640000;" + "internal_thread_pool.keep_alive_time=60000;"
+ "internal_thread_pool.rejection_policy=discard;" + "mcast_recv_buf_size=25000000;bind_port=55200;"
+ "internal_thread_pool.queue_max_size=100;" + "mcast_port=45688;thread_pool.min_threads=20;"
+ "oob_thread_pool.rejection_policy=discard;" + "thread_pool.max_threads=300;enable_diagnostics=false;"
+ "thread_pool.enabled=true;internal_thread_pool.queue_enabled=true;" + "ucast_recv_buf_size=20000000;ucast_send_buf_size=640000;"
+ "internal_thread_pool.enabled=true;oob_thread_pool.enabled=true;" + "ip_ttl=2;thread_pool.rejection_policy=discard;thread_pool.keep_alive_time=5000;"
+ "internal_thread_pool.max_threads=10;thread_pool.queue_enabled=true;" + "mcast_addr=230.0.0.4;singleton_name=udp;max_bundle_timeout=30;"
+ "oob_thread_pool.queue_enab
 led=false;internal_thread_pool.min_threads=1;" + "bundler_type=old;oob_thread_pool.min_threads=20;"
+ "thread_pool.queue_max_size=1000):PING(num_initial_members=3;" + "timeout=2000):MERGE3(min_interval=20000;max_interval=100000)"
+ ":FD_SOCK(bind_addr=127.0.0.1;start_port=54200):FD_ALL(interval=3000;" + "timeout=15000):VERIFY_SUSPECT(bind_addr=127.0.0.1;"
+ "timeout=1500):pbcast.NAKACK2(max_msg_batch_size=100;" + "xmit_table_msgs_per_row=10000;xmit_table_max_compaction_time=10000;"
+ "xmit_table_num_rows=100;xmit_interval=1000):UNICAST3(xmit_table_msgs_per_row=10000;" +
"xmit_table_max_compaction_time=10000;xmit_table_num_rows=20)" + ":pbcast.STABLE(desired_avg_gossip=50000;max_bytes=400000;"
+ "stability_delay=1000):pbcast.GMS(print_local_addr=true;" + "view_bundling=true;join_timeout=3000;view_ack_collection_timeout=5000;"
+ "resume_task_timeout=7500):UFC(max_credits=1m;min_threshold=0.40)" + ":MFC(max_credits=1m;min_threshold=0.40):FRAG2(frag_size=30k)"
+ ":RSVP(resend_interval=5
 00;ack_on_delivery=false;timeout=60000)";
+
+
+
+
+   @Test
+   public void testRefCount() throws Exception {
+      try {
+
+         PlainConfigurator configurator = new PlainConfigurator(jgroupsConfigString);
+         JChannel channel = new JChannel(configurator);
+
+         String channelName1 = "channel1";
+
+         BroadcastEndpointFactory jgroupsBroadcastCfg1 = new ChannelBroadcastEndpointFactory(channel,
channelName1);
+
+         BroadcastEndpoint channelEndpoint1 = jgroupsBroadcastCfg1.createBroadcastEndpoint();
+
+         BroadcastEndpoint channelEndpoint2 = jgroupsBroadcastCfg1.createBroadcastEndpoint();
+
+         BroadcastEndpoint channelEndpoint3 = jgroupsBroadcastCfg1.createBroadcastEndpoint();
+
+         channelEndpoint1.close(true);
+
+         Assert.assertTrue(channel.isOpen());
+
+         channelEndpoint2.close(true);
+
+         Assert.assertTrue(channel.isOpen());
+
+         channelEndpoint3.close(true);
+
+         Assert.assertTrue(channel.isOpen());
+
+         channel.close();
+
+         //after we close the last endpoint reference counting will close the channel so
once we create a new one the
+         // channel wrapper is recreated
+         try {
+            channelEndpoint2.openClient();
+            Assert.fail("this should be closed");
+         }
+         catch (Exception e) {
+         }
+
+         JChannel newChannel = new JChannel(configurator);
+
+         jgroupsBroadcastCfg1 = new ChannelBroadcastEndpointFactory(newChannel, channelName1);
+
+         channelEndpoint1 = jgroupsBroadcastCfg1.createBroadcastEndpoint();
+
+         channelEndpoint1.openClient();
+
+
+      }
+      catch (Exception e) {
+         e.printStackTrace();
+         throw e;
+      }
+   }
+
+}


Mime
View raw message