activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/6] activemq-artemis git commit: ARTEMIS-474 Clustering fails on certain topologies
Date Fri, 15 Apr 2016 01:14:47 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 914d93f63 -> a3ae2c4ad


ARTEMIS-474 Clustering fails on certain topologies

Communication between nodes will fail under certain topologies
JGroups has something called JForkChannel that could be used on container systems.
And be injected into Artemis.
For some reason that channel cannot be reused for more than one channel per VM.
And it cannot ever be closed.

I am keeping the trace logs I used to debug this issue in case anything similar to this happens
again.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/630db2d6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/630db2d6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/630db2d6

Branch: refs/heads/master
Commit: 630db2d69c1de2a497169cb601390b55cd1cdd19
Parents: d6c7e30
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Thu Apr 14 17:51:18 2016 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Apr 14 18:55:01 2016 -0400

----------------------------------------------------------------------
 .../core/ChannelBroadcastEndpointFactory.java   |  54 +++++-
 .../api/core/JGroupsBroadcastEndpoint.java      | 167 ++-----------------
 .../core/JGroupsChannelBroadcastEndpoint.java   |   6 +-
 .../api/core/JGroupsFileBroadcastEndpoint.java  |   5 +-
 .../JGroupsFileBroadcastEndpointFactory.java    |   7 +-
 .../JGroupsPropertiesBroadcastEndpoint.java     |   8 +-
 ...roupsPropertiesBroadcastEndpointFactory.java |   6 +-
 .../api/core/jgroups/JChannelManager.java       |  62 +++++++
 .../api/core/jgroups/JChannelWrapper.java       | 145 ++++++++++++++++
 .../api/core/jgroups/JGroupsReceiver.java       |  72 ++++++++
 .../core/client/impl/ServerLocatorImpl.java     |   4 +-
 .../core/server/cluster/ClusterController.java  |   3 +-
 .../impl/SharedNothingBackupActivation.java     | 108 +++++++++++-
 13 files changed, 474 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
index be5e04c..d7086a5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
@@ -16,6 +16,11 @@
  */
 package org.apache.activemq.artemis.api.core;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
+import org.jboss.logging.Logger;
 import org.jgroups.JChannel;
 
 /**
@@ -25,11 +30,49 @@ import org.jgroups.JChannel;
  */
 public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory {
 
+   private static final Logger logger = Logger.getLogger(ChannelBroadcastEndpointFactory.class);
+   private static final boolean isTrace = logger.isTraceEnabled();
+
    private final JChannel channel;
 
    private final String channelName;
 
+   private final JChannelManager manager;
+
+   private static final Map<JChannel, JChannelManager> managers = new ConcurrentHashMap<>();
+
+   private static final JChannelManager singletonManager = new JChannelManager();
+//  TODO: To implement this when JForkChannel from JGroups supports multiple channels properly
+//
+//   private static JChannelManager recoverManager(JChannel channel) {
+//      JChannelManager manager = managers.get(channel);
+//      if (manager == null) {
+//         if (isTrace) {
+//            logger.trace("Creating a new JChannelManager for " + channel, new Exception("trace"));
+//         }
+//         manager = new JChannelManager();
+//         managers.put(channel, manager);
+//      }
+//      else {
+//         if (isTrace) {
+//            logger.trace("Recover an already existent channelManager for " + channel, new
Exception("trace"));
+//         }
+//
+//      }
+//
+//      return manager;
+//   }
+//
    public ChannelBroadcastEndpointFactory(JChannel channel, String channelName) {
+      // TODO: use recoverManager(channel)
+      this(singletonManager, channel, channelName);
+   }
+
+   private ChannelBroadcastEndpointFactory(JChannelManager manager, JChannel channel, String
channelName) {
+      if (isTrace) {
+         logger.trace("new ChannelBroadcastEndpointFactory(" + manager + ", " + channel +
", " + channelName, new Exception("trace"));
+      }
+      this.manager = manager;
       this.channel = channel;
       this.channelName = channelName;
    }
@@ -43,7 +86,16 @@ public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory
    }
 
    @Override
+   public String toString() {
+      return "ChannelBroadcastEndpointFactory{" +
+         "channel=" + channel +
+         ", channelName='" + channelName + '\'' +
+         ", manager=" + manager +
+         '}';
+   }
+
+   @Override
    public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
-      return new JGroupsChannelBroadcastEndpoint(channel, channelName).initChannel();
+      return new JGroupsChannelBroadcastEndpoint(manager, channel, channelName).initChannel();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
index 5bcddbc..7657b0b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
@@ -16,15 +16,11 @@
  */
 package org.apache.activemq.artemis.api.core;
 
+import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
+import org.apache.activemq.artemis.api.core.jgroups.JChannelWrapper;
+import org.apache.activemq.artemis.api.core.jgroups.JGroupsReceiver;
+import org.jboss.logging.Logger;
 import org.jgroups.JChannel;
-import org.jgroups.ReceiverAdapter;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -32,6 +28,9 @@ import java.util.concurrent.TimeUnit;
  */
 public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
 
+   private static final Logger logger = Logger.getLogger(JGroupsBroadcastEndpoint.class);
+
+   private static final boolean isTrace = logger.isTraceEnabled();
    private final String channelName;
 
    private boolean clientOpened;
@@ -42,12 +41,16 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint
{
 
    private JGroupsReceiver receiver;
 
-   public JGroupsBroadcastEndpoint(String channelName) {
+   private JChannelManager manager;
+
+   public JGroupsBroadcastEndpoint(JChannelManager manager, String channelName) {
+      this.manager = manager;
       this.channelName = channelName;
    }
 
    @Override
    public void broadcast(final byte[] data) throws Exception {
+      if (isTrace) logger.trace("Broadcasting: BroadCastOpened=" + broadcastOpened + ", channelOPen="
+ channel.getChannel().isOpen());
       if (broadcastOpened) {
          org.jgroups.Message msg = new org.jgroups.Message();
 
@@ -59,6 +62,7 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint
{
 
    @Override
    public byte[] receiveBroadcast() throws Exception {
+      if (isTrace) logger.trace("Receiving Broadcast: clientOpened=" + clientOpened + ",
channelOPen=" + channel.getChannel().isOpen());
       if (clientOpened) {
          return receiver.receiveBroadcast();
       }
@@ -69,6 +73,7 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint
{
 
    @Override
    public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {
+      if (isTrace) logger.trace("Receiving Broadcast2: clientOpened=" + clientOpened + ",
channelOPen=" + channel.getChannel().isOpen());
       if (clientOpened) {
          return receiver.receiveBroadcast(time, unit);
       }
@@ -99,7 +104,7 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint
{
    public abstract JChannel createChannel() throws Exception;
 
    public JGroupsBroadcastEndpoint initChannel() throws Exception {
-      this.channel = JChannelManager.getJChannel(channelName, this);
+      this.channel = manager.getJChannel(channelName, this);
       return this;
    }
 
@@ -128,146 +133,4 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint
{
       channel.close(true);
    }
 
-   /**
-    * This class is used to receive messages from a JGroups channel.
-    * Incoming messages are put into a queue.
-    */
-   private static final class JGroupsReceiver extends ReceiverAdapter {
-
-      private final BlockingQueue<byte[]> dequeue = new LinkedBlockingDeque<>();
-
-      @Override
-      public void receive(org.jgroups.Message msg) {
-         dequeue.add(msg.getBuffer());
-      }
-
-      public byte[] receiveBroadcast() throws Exception {
-         return dequeue.take();
-      }
-
-      public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {
-         return dequeue.poll(time, unit);
-      }
-   }
-
-   /**
-    * This class wraps a JChannel with a reference counter. The reference counter
-    * controls the life of the JChannel. When reference count is zero, the channel
-    * will be disconnected.
-    */
-   protected static class JChannelWrapper {
-
-      int refCount = 1;
-      JChannel channel;
-      String channelName;
-      final List<JGroupsReceiver> receivers = new ArrayList<>();
-
-      public JChannelWrapper(String channelName, JChannel channel) throws Exception {
-         this.refCount = 1;
-         this.channelName = channelName;
-         this.channel = channel;
-
-         //we always add this for the first ref count
-         channel.setReceiver(new ReceiverAdapter() {
-
-            @Override
-            public void receive(org.jgroups.Message msg) {
-               synchronized (receivers) {
-                  for (JGroupsReceiver r : receivers) {
-                     r.receive(msg);
-                  }
-               }
-            }
-         });
-      }
-
-      public synchronized void close(boolean closeWrappedChannel) {
-         refCount--;
-         if (refCount == 0) {
-            if (closeWrappedChannel) {
-               JChannelManager.closeChannel(this.channelName, channel);
-            }
-            else {
-               JChannelManager.removeChannel(this.channelName);
-            }
-            //we always remove the receiver as its no longer needed
-            channel.setReceiver(null);
-         }
-      }
-
-      public void removeReceiver(JGroupsReceiver receiver) {
-         synchronized (receivers) {
-            receivers.remove(receiver);
-         }
-      }
-
-      public synchronized void connect() throws Exception {
-         if (channel.isConnected())
-            return;
-         channel.connect(channelName);
-      }
-
-      public void addReceiver(JGroupsReceiver jGroupsReceiver) {
-         synchronized (receivers) {
-            receivers.add(jGroupsReceiver);
-         }
-      }
-
-      public void send(org.jgroups.Message msg) throws Exception {
-         channel.send(msg);
-      }
-
-      public JChannelWrapper addRef() {
-         this.refCount++;
-         return this;
-      }
-
-      @Override
-      public String toString() {
-         return "JChannelWrapper of [" + channel + "] " + refCount + " " + channelName;
-      }
-   }
-
-   /**
-    * This class maintain a global Map of JChannels wrapped in JChannelWrapper for
-    * the purpose of reference counting.
-    *
-    * Wherever a JChannel is needed it should only get it by calling the getChannel()
-    * method of this class. The real disconnect of channels are also done here only.
-    */
-   protected static class JChannelManager {
-
-      private static Map<String, JChannelWrapper> channels;
-
-      public static synchronized JChannelWrapper getJChannel(String channelName,
-                                                             JGroupsBroadcastEndpoint endpoint)
throws Exception {
-         if (channels == null) {
-            channels = new HashMap<>();
-         }
-         JChannelWrapper wrapper = channels.get(channelName);
-         if (wrapper == null) {
-            wrapper = new JChannelWrapper(channelName, endpoint.createChannel());
-            channels.put(channelName, wrapper);
-            return wrapper;
-         }
-         return wrapper.addRef();
-      }
-
-      public static synchronized void closeChannel(String channelName, JChannel channel)
{
-         channel.setReceiver(null);
-         channel.disconnect();
-         channel.close();
-         JChannelWrapper wrapper = channels.remove(channelName);
-         if (wrapper == null) {
-            throw new IllegalStateException("Did not find channel " + channelName);
-         }
-      }
-
-      public static void removeChannel(String channelName) {
-         JChannelWrapper wrapper = channels.remove(channelName);
-         if (wrapper == null) {
-            throw new IllegalStateException("Did not find channel " + channelName);
-         }
-      }
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java
index 4fbb24c..96cfee6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.api.core;
 
+import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
+import org.apache.activemq.artemis.api.core.jgroups.JChannelWrapper;
 import org.jgroups.JChannel;
 
 /**
@@ -27,8 +29,8 @@ public class JGroupsChannelBroadcastEndpoint extends JGroupsBroadcastEndpoint
{
 
    private final JChannel jChannel;
 
-   public JGroupsChannelBroadcastEndpoint(JChannel jChannel, final String channelName) throws
Exception {
-      super(channelName);
+   public JGroupsChannelBroadcastEndpoint(JChannelManager manager, JChannel jChannel, final
String channelName)  {
+      super(manager, channelName);
       this.jChannel = jChannel;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpoint.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpoint.java
index 702cb5a..be903d3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpoint.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpoint.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.api.core;
 
+import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
 import org.jgroups.JChannel;
 
 import java.net.URL;
@@ -27,8 +28,8 @@ public final class JGroupsFileBroadcastEndpoint extends JGroupsBroadcastEndpoint
 
    private String file;
 
-   public JGroupsFileBroadcastEndpoint(final String file, final String channelName) throws
Exception {
-      super(channelName);
+   public JGroupsFileBroadcastEndpoint(final JChannelManager manager, final String file,
final String channelName) throws Exception {
+      super(manager, channelName);
       this.file = file;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
index 132ac3c..9f783e7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
@@ -16,15 +16,20 @@
  */
 package org.apache.activemq.artemis.api.core;
 
+
+import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
+
 public class JGroupsFileBroadcastEndpointFactory implements BroadcastEndpointFactory {
 
    private String file;
 
    private String channelName;
 
+   private final JChannelManager manager = new JChannelManager();
+
    @Override
    public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
-      return new JGroupsFileBroadcastEndpoint(file, channelName).initChannel();
+      return new JGroupsFileBroadcastEndpoint(manager, file, channelName).initChannel();
    }
 
    public String getFile() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpoint.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpoint.java
index 25cefc3..d10400a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpoint.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpoint.java
@@ -16,18 +16,19 @@
  */
 package org.apache.activemq.artemis.api.core;
 
+import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
 import org.jgroups.JChannel;
 import org.jgroups.conf.PlainConfigurator;
 
 /**
  * This class is the implementation of ActiveMQ Artemis members discovery that will use JGroups.
  */
-public final class JGroupsPropertiesBroadcastEndpoint extends JGroupsBroadcastEndpoint {
+public final  class JGroupsPropertiesBroadcastEndpoint extends JGroupsBroadcastEndpoint {
 
    private String properties;
 
-   public JGroupsPropertiesBroadcastEndpoint(final String properties, final String channelName)
throws Exception {
-      super(channelName);
+   public JGroupsPropertiesBroadcastEndpoint(final JChannelManager manager, final String
properties, final String channelName) throws Exception {
+      super(manager, channelName);
       this.properties = properties;
    }
 
@@ -37,3 +38,4 @@ public final class JGroupsPropertiesBroadcastEndpoint extends JGroupsBroadcastEn
       return new JChannel(configurator);
    }
 }
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
index 4d80435..8ed03ab 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
@@ -16,15 +16,19 @@
  */
 package org.apache.activemq.artemis.api.core;
 
+import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
+
 public class JGroupsPropertiesBroadcastEndpointFactory implements BroadcastEndpointFactory
{
 
    private String properties;
 
    private String channelName;
 
+   private final JChannelManager manager = new JChannelManager();
+
    @Override
    public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
-      return new JGroupsPropertiesBroadcastEndpoint(properties, channelName).initChannel();
+      return new JGroupsPropertiesBroadcastEndpoint(manager, properties, channelName).initChannel();
    }
 
    public String getProperties() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
new file mode 100644
index 0000000..296dc8a
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core.jgroups;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.JGroupsBroadcastEndpoint;
+import org.jboss.logging.Logger;
+
+/**
+ * This class maintain a global Map of JChannels wrapped in JChannelWrapper for
+ * the purpose of reference counting.
+ *
+ * Wherever a JChannel is needed it should only get it by calling the getChannel()
+ * method of this class. The real disconnect of channels are also done here only.
+ */
+public class JChannelManager {
+
+   private static final Logger logger = Logger.getLogger(JChannelManager.class);
+   private static final boolean isTrace = logger.isTraceEnabled();
+
+   private Map<String, JChannelWrapper> channels;
+
+   public synchronized JChannelWrapper getJChannel(String channelName,
+                                                   JGroupsBroadcastEndpoint endpoint) throws
Exception {
+      if (channels == null) {
+         channels = new HashMap<>();
+      }
+      JChannelWrapper wrapper = channels.get(channelName);
+      if (wrapper == null) {
+         wrapper = new JChannelWrapper(this, channelName, endpoint.createChannel());
+         channels.put(channelName, wrapper);
+         if (isTrace)
+            logger.trace("Put Channel " + channelName);
+         return wrapper;
+      }
+      if (isTrace)
+         logger.trace("Add Ref Count " + channelName);
+      return wrapper.addRef();
+   }
+
+   public synchronized void removeChannel(String channelName) {
+      channels.remove(channelName);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
new file mode 100644
index 0000000..08a8ff8
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core.jgroups;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.logging.Logger;
+import org.jgroups.JChannel;
+import org.jgroups.ReceiverAdapter;
+
+/**
+ * This class wraps a JChannel with a reference counter. The reference counter
+ * controls the life of the JChannel. When reference count is zero, the channel
+ * will be disconnected.
+ */
+public class JChannelWrapper {
+   private static final Logger logger = Logger.getLogger(JChannelWrapper.class);
+   private static final boolean isTrace = logger.isTraceEnabled();
+
+   private boolean connected = false;
+   int refCount = 1;
+   final JChannel channel;
+   final String channelName;
+   final List<JGroupsReceiver> receivers = new ArrayList<>();
+   private final JChannelManager manager;
+
+   public JChannelWrapper(JChannelManager manager, final String channelName, JChannel channel)
throws Exception {
+      this.refCount = 1;
+      this.channelName = channelName;
+      this.channel = channel;
+      this.manager = manager;
+
+
+      if (channel.getReceiver() != null) {
+         logger.warn("The channel already had a receiver previously!!!!", new Exception("trace"));
+      }
+
+      //we always add this for the first ref count
+      channel.setReceiver(new ReceiverAdapter() {
+
+         @Override
+         public void receive(org.jgroups.Message msg) {
+            if (isTrace) {
+               logger.trace(this + ":: Wrapper received " + msg + " on channel " + channelName);
+            }
+            synchronized (receivers) {
+               for (JGroupsReceiver r : receivers) {
+                  r.receive(msg);
+               }
+            }
+         }
+      });
+   }
+
+   public JChannel getChannel() {
+      return channel;
+   }
+
+   public String getChannelName() {
+      return channelName;
+   }
+
+   public synchronized void close(boolean closeWrappedChannel) {
+      refCount--;
+      if (isTrace) logger.trace(this + "::RefCount-- " + refCount + " on channel " + channelName,
new Exception("Trace"));
+      if (refCount == 0) {
+         if (closeWrappedChannel) {
+            connected = false;
+            channel.setReceiver(null);
+            logger.trace(this + "::Closing Channel: " + channelName, new Exception("Trace"));
+            channel.close();
+         }
+         manager.removeChannel(channelName);
+      }
+   }
+
+   public void removeReceiver(JGroupsReceiver receiver) {
+      if (isTrace) logger.trace(this + "::removeReceiver: " + receiver + " on "  + channelName,
new Exception("Trace"));
+      synchronized (receivers) {
+         receivers.remove(receiver);
+      }
+   }
+
+   public synchronized void connect() throws Exception {
+      if (isTrace) {
+         logger.trace(this + ":: Connecting " + channelName, new Exception("Trace"));
+      }
+
+      // It is important to check this otherwise we could reconnect an already connected
channel
+      if (connected) {
+         return;
+      }
+
+      connected = true;
+
+      if (!channel.isConnected()) {
+         channel.connect(channelName);
+      }
+   }
+
+   public void addReceiver(JGroupsReceiver jGroupsReceiver) {
+      synchronized (receivers) {
+         if (isTrace) logger.trace(this + "::Add Receiver: " + jGroupsReceiver + " on " +
channelName);
+         receivers.add(jGroupsReceiver);
+      }
+   }
+
+   public void send(org.jgroups.Message msg) throws Exception {
+      if (isTrace) logger.trace(this + "::Sending JGroups Message: Open=" + channel.isOpen()
+ " on channel " + channelName + " msg=" + msg);
+      channel.send(msg);
+   }
+
+   public JChannelWrapper addRef() {
+      this.refCount++;
+      if (isTrace) logger.trace(this + "::RefCount++ = " + refCount + " on channel " + channelName);
+      return this;
+   }
+
+   @Override
+   public String toString() {
+      return super.toString() +
+         "{refCount=" + refCount +
+         ", channel=" + channel +
+         ", channelName='" + channelName + '\'' +
+         ", connected=" + connected +
+         '}';
+   }
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java
new file mode 100644
index 0000000..c931661
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core.jgroups;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.logging.Logger;
+import org.jgroups.ReceiverAdapter;
+
+/**
+ * This class is used to receive messages from a JGroups channel.
+ * Incoming messages are put into a queue.
+ */
+public class JGroupsReceiver extends ReceiverAdapter {
+
+   private static final Logger logger = Logger.getLogger(JGroupsReceiver.class);
+   private static final boolean isTrace = logger.isTraceEnabled();
+
+   private final BlockingQueue<byte[]> dequeue = new LinkedBlockingDeque<>();
+
+   @Override
+   public void receive(org.jgroups.Message msg) {
+      if (isTrace) logger.trace("sending message " + msg);
+      dequeue.add(msg.getBuffer());
+   }
+
+   public byte[] receiveBroadcast() throws Exception {
+      byte[] bytes = dequeue.take();
+      if (isTrace) {
+         logBytes("receiveBroadcast()", bytes);
+      }
+
+      return bytes;
+   }
+
+   private void logBytes(String methodName, byte[] bytes) {
+      if (bytes != null) {
+         logger.trace(methodName + "::" + bytes.length + " bytes");
+      }
+      else {
+         logger.trace(methodName + ":: no bytes");
+      }
+   }
+
+   public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {
+      byte[] bytes = dequeue.poll(time, unit);
+
+      if (isTrace) {
+         logBytes("receiveBroadcast(long time, TimeUnit unit)", bytes);
+      }
+
+      return bytes;
+   }
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index e7cc55a..53ba9df 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -809,7 +809,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
       // how the sendSubscription happens.
       // in case this ever changes.
       if (topology != null && !factory.waitForTopology(callTimeout, TimeUnit.MILLISECONDS))
{
-         factory.cleanup();
+         if (factory != null) {
+            factory.cleanup();
+         }
          throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup);
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
index b84164f..175ca99 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
@@ -408,8 +408,9 @@ public class ClusterController implements ActiveMQComponent {
             }
          }
          catch (ActiveMQException e) {
-            if (!started)
+            if (!started) {
                return;
+            }
             server.getScheduledPool().schedule(this, serverLocator.getRetryInterval(), TimeUnit.MILLISECONDS);
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
index 12e9298..d9a5c78 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
@@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBacku
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.jboss.logging.Logger;
 
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -53,6 +54,10 @@ import static org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothi
 
 public final class SharedNothingBackupActivation extends Activation {
 
+
+   private static final Logger logger = Logger.getLogger(SharedNothingBackupActivation.class);
+   private static final boolean isTrace = logger.isTraceEnabled();
+
    //this is how we act when we start as a backup
    private ReplicaPolicy replicaPolicy;
 
@@ -129,43 +134,86 @@ public final class SharedNothingBackupActivation extends Activation
{
          }
          ClusterController clusterController = activeMQServer.getClusterManager().getClusterController();
          clusterController.addClusterTopologyListenerForReplication(nodeLocator);
+
+         if (isTrace) {
+            logger.trace("Waiting on cluster connection");
+         }
          //todo do we actually need to wait?
          clusterController.awaitConnectionToReplicationCluster();
 
+         if (isTrace) {
+            logger.trace("Cluster Connected");
+         }
          clusterController.addIncomingInterceptorForReplication(new ReplicationError(activeMQServer,
nodeLocator));
 
          // nodeManager.startBackup();
-
+         if (isTrace) {
+            logger.trace("Starting backup manager");
+         }
          activeMQServer.getBackupManager().start();
 
+         if (isTrace) {
+            logger.trace("Set backup Quorum");
+         }
          replicationEndpoint.setBackupQuorum(backupQuorum);
+
          replicationEndpoint.setExecutor(activeMQServer.getExecutorFactory().getExecutor());
          EndpointConnector endpointConnector = new EndpointConnector();
 
+         if (isTrace) {
+            logger.trace("Starting Backup Server");
+         }
+
          ActiveMQServerLogger.LOGGER.backupServerStarted(activeMQServer.getVersion().getFullVersion(),
activeMQServer.getNodeManager().getNodeId());
          activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
 
+         if (isTrace) logger.trace("Setting server state as started");
+
          SharedNothingBackupQuorum.BACKUP_ACTIVATION signal;
          do {
-            //locate the first live server to try to replicate
-            nodeLocator.locateNode();
+
+
             if (closed) {
+               if (isTrace) {
+                  logger.trace("Activation is closed, so giving up");
+               }
                return;
             }
+
+
+            if (isTrace) {
+               logger.trace("looking up the node through nodeLocator.locateNode()");
+            }
+            //locate the first live server to try to replicate
+            nodeLocator.locateNode();
             Pair<TransportConfiguration, TransportConfiguration> possibleLive = nodeLocator.getLiveConfiguration();
             nodeID = nodeLocator.getNodeID();
+
+            if (isTrace) {
+               logger.trace("nodeID = " + nodeID);
+            }
             //in a normal (non failback) scenario if we couldn't find our live server we
should fail
             if (!attemptFailBack) {
+               if (isTrace) {
+                  logger.trace("attemptFailback=false, nodeID=" + nodeID);
+               }
+
                //this shouldn't happen
-               if (nodeID == null)
+               if (nodeID == null) {
+                  logger.debug("Throwing a RuntimeException as nodeID==null ant attemptFailback=false");
                   throw new RuntimeException("Could not establish the connection");
+               }
                activeMQServer.getNodeManager().setNodeID(nodeID);
             }
 
             try {
+               if (isTrace) {
+                  logger.trace("Calling clusterController.connectToNodeInReplicatedCluster("
+ possibleLive.getA() + ")");
+               }
                clusterControl = clusterController.connectToNodeInReplicatedCluster(possibleLive.getA());
             }
             catch (Exception e) {
+               logger.debug(e.getMessage(), e);
                if (possibleLive.getB() != null) {
                   try {
                      clusterControl = clusterController.connectToNodeInReplicatedCluster(possibleLive.getB());
@@ -176,6 +224,10 @@ public final class SharedNothingBackupActivation extends Activation {
                }
             }
             if (clusterControl == null) {
+
+               if (isTrace) {
+                  logger.trace("sleeping " + clusterController.getRetryIntervalForReplicatedCluster()
+ " it should retry");
+               }
                //its ok to retry here since we haven't started replication yet
                //it may just be the server has gone since discovery
                Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster());
@@ -190,23 +242,43 @@ public final class SharedNothingBackupActivation extends Activation
{
              * process again on the next live server.  All the action happens inside {@link
BackupQuorum}
              */
             signal = backupQuorum.waitForStatusChange();
+
+            if (isTrace) {
+               logger.trace("Got a signal " + signal + " through backupQuorum.waitForStatusChange()");
+            }
+
             /**
              * replicationEndpoint will be holding lots of open files. Make sure they get
              * closed/sync'ed.
              */
             ActiveMQServerImpl.stopComponent(replicationEndpoint);
             // time to give up
-            if (!activeMQServer.isStarted() || signal == STOP)
+            if (!activeMQServer.isStarted() || signal == STOP) {
+               if (isTrace) {
+                  logger.trace("giving up on the activation:: activemqServer.isStarted="
+ activeMQServer.isStarted() + " while signal = " + signal);
+               }
                return;
+            }
                // time to fail over
-            else if (signal == FAIL_OVER)
+            else if (signal == FAIL_OVER) {
+               if (isTrace) {
+                  logger.trace("signal == FAIL_OVER, breaking the loop");
+               }
                break;
+            }
                // something has gone badly run restart from scratch
             else if (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING)
{
+               if (isTrace) {
+                  logger.trace("Starting a new thread to stop the server!");
+               }
+
                Thread startThread = new Thread(new Runnable() {
                   @Override
                   public void run() {
                      try {
+                        if (isTrace) {
+                           logger.trace("Calling activeMQServer.stop()");
+                        }
                         activeMQServer.stop();
                      }
                      catch (Exception e) {
@@ -227,17 +299,30 @@ public final class SharedNothingBackupActivation extends Activation
{
             }
          } while (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING);
 
+         if (isTrace) {
+            logger.trace("Activation loop finished, current signal = " + signal);
+         }
+
          activeMQServer.getClusterManager().getQuorumManager().unRegisterQuorum(backupQuorum);
 
          if (!isRemoteBackupUpToDate()) {
+            logger.debug("throwing exception for !isRemoteBackupUptoDate");
             throw ActiveMQMessageBundle.BUNDLE.backupServerNotInSync();
          }
 
+
+         if (isTrace) {
+            logger.trace("setReplicaPolicy::" + replicaPolicy);
+         }
+
          replicaPolicy.getReplicatedPolicy().setReplicaPolicy(replicaPolicy);
          activeMQServer.setHAPolicy(replicaPolicy.getReplicatedPolicy());
+
          synchronized (activeMQServer) {
-            if (!activeMQServer.isStarted())
+            if (!activeMQServer.isStarted()) {
+               logger.trace("Server is stopped, giving up right before becomingLive");
                return;
+            }
             ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer);
             activeMQServer.getNodeManager().stopBackup();
             activeMQServer.getStorageManager().start();
@@ -262,6 +347,9 @@ public final class SharedNothingBackupActivation extends Activation {
          }
       }
       catch (Exception e) {
+         if (isTrace) {
+            logger.trace(e.getMessage() + ", serverStarted=" + activeMQServer.isStarted(),
e);
+         }
          if ((e instanceof InterruptedException || e instanceof IllegalStateException) &&
!activeMQServer.isStarted())
             // do not log these errors if the server is being stopped.
             return;
@@ -374,8 +462,10 @@ public final class SharedNothingBackupActivation extends Activation {
     * @throws ActiveMQException
     */
    public void remoteFailOver(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage)
throws ActiveMQException {
-      ActiveMQServerLogger.LOGGER.trace("Remote fail-over, got message=" + finalMessage +
", backupUpToDate=" +
-                                           backupUpToDate);
+      if (isTrace) {
+         logger.trace("Remote fail-over, got message=" + finalMessage + ", backupUpToDate="
+
+                         backupUpToDate);
+      }
       if (!activeMQServer.getHAPolicy().isBackup() || activeMQServer.getHAPolicy().isSharedStore())
{
          throw new ActiveMQInternalErrorException();
       }


Mime
View raw message