activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [7/8] activemq-6 git commit: ACTIVEMQ6-7 - Improve Serialization on Connection Factory
Date Thu, 12 Feb 2015 20:01:47 GMT
ACTIVEMQ6-7 - Improve Serialization on Connection Factory

https://issues.apache.org/jira/browse/ACTIVEMQ6-7

Connection Factory is now externalizable and is now serialized as a string that represents a URI. There are schemas for every possible type for connection factory and server locator.

The client JNDI representation of factories has also been changed to be consistent with this.


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/3b76ccc9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/3b76ccc9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/3b76ccc9

Branch: refs/heads/master
Commit: 3b76ccc92b5f0e00de54a4fd6c3705d1b8509771
Parents: b24d729
Author: Andy Taylor <andytaylor@apache.org>
Authored: Thu Jan 29 09:18:36 2015 +0000
Committer: Andy Taylor <andytaylor@apache.org>
Committed: Thu Feb 12 09:14:24 2015 +0000

----------------------------------------------------------------------
 .../activemq/utils/uri/SchemaConstants.java     |  31 ++
 .../apache/activemq/utils/uri/URIFactory.java   |  85 +++-
 .../apache/activemq/utils/uri/URISchema.java    |  86 +++-
 .../apache/activemq/utils/URIParserTest.java    |  12 +
 .../api/core/BroadcastEndpointFactory.java      |   2 +-
 .../BroadcastEndpointFactoryConfiguration.java  |  28 --
 .../api/core/BroadcastGroupConfiguration.java   |  18 +-
 .../core/ChannelBroadcastEndpointFactory.java   |  41 ++
 .../api/core/DiscoveryGroupConfiguration.java   |  67 +--
 ...ryGroupConfigurationCompatibilityHelper.java |  45 ---
 .../api/core/JGroupsBroadcastEndpoint.java      | 281 +++++++++++++
 .../JGroupsBroadcastGroupConfiguration.java     | 404 -------------------
 .../core/JGroupsChannelBroadcastEndpoint.java   |  39 ++
 .../api/core/JGroupsFileBroadcastEndpoint.java  |  49 +++
 .../JGroupsFileBroadcastEndpointFactory.java    |  55 +++
 .../JGroupsPropertiesBroadcastEndpoint.java     |  43 ++
 ...roupsPropertiesBroadcastEndpointFactory.java |  55 +++
 .../api/core/UDPBroadcastEndpointFactory.java   | 330 +++++++++++++++
 .../core/UDPBroadcastGroupConfiguration.java    | 339 ----------------
 .../api/core/client/ActiveMQClient.java         |  14 +
 .../api/core/client/TopologyMember.java         |   4 +-
 .../core/client/impl/ServerLocatorImpl.java     |   9 +-
 .../remoting/ClientProtocolManagerFactory.java  |   4 +-
 .../uri/AbstractServerLocatorSchema.java        |  34 ++
 .../apache/activemq/uri/ConnectionOptions.java  |  75 ++++
 .../activemq/uri/InVMServerLocatorSchema.java   |  73 ++++
 .../uri/JGroupsServerLocatorSchema.java         | 102 +++++
 .../activemq/uri/ServerLocatorParser.java       |  33 ++
 .../activemq/uri/TCPServerLocatorSchema.java    | 160 ++++++++
 .../activemq/uri/UDPServerLocatorSchema.java    |  89 ++++
 activemq-jms-client/pom.xml                     |   7 +
 .../activemq/api/jms/ActiveMQJMSClient.java     |  13 +
 .../jms/client/ActiveMQConnectionFactory.java   |  70 +++-
 .../client/ActiveMQJMSConnectionFactory.java    |   1 -
 .../jndi/ActiveMQInitialContextFactory.java     | 359 ++--------------
 .../apache/activemq/uri/AbstractCFSchema.java   |   6 +-
 .../activemq/uri/ConnectionFactoryParser.java   |   2 +
 .../apache/activemq/uri/ConnectionOptions.java  | 120 ------
 .../org/apache/activemq/uri/InVMSchema.java     |  51 +++
 .../org/apache/activemq/uri/JGroupsSchema.java  |  53 ++-
 .../activemq/uri/JMSConnectionOptions.java      |  79 ++++
 .../java/org/apache/activemq/uri/TCPSchema.java |  67 +++
 .../java/org/apache/activemq/uri/UDPSchema.java |  33 +-
 .../activemq/uri/ConnectionFactoryURITest.java  | 368 ++++++++++++++++-
 .../activemq/ra/ActiveMQResourceAdapter.java    |  33 +-
 .../activemq/core/config/Configuration.java     |   3 +-
 .../core/config/impl/ConfigurationImpl.java     |   3 +-
 .../deployers/impl/FileConfigurationParser.java |  26 +-
 .../impl/BroadcastGroupControlImpl.java         |  14 +-
 .../remoting/impl/invm/InVMAcceptorFactory.java |   1 -
 .../impl/invm/InVMConnectorFactory.java         |   1 -
 .../remoting/impl/invm/TransportConstants.java  |   5 -
 .../impl/netty/NettyAcceptorFactory.java        |   1 -
 .../server/impl/RemotingServiceImpl.java        |  15 -
 .../core/server/cluster/ClusterManager.java     |   2 +-
 .../spi/core/remoting/AcceptorFactory.java      |   1 -
 .../core/config/impl/FileConfigurationTest.java |  16 +-
 docs/user-manual/en/using-jms.md                | 105 ++---
 .../aerogear/src/main/resources/jndi.properties |   2 +-
 .../ApplicationLayerFailoverExample.java        |   2 +-
 .../activemq/jms/example/BridgeExample.java     |   4 +-
 .../browser/src/main/resources/jndi.properties  |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   6 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../ClusteredDurableSubscriptionExample.java    |   4 +-
 .../jms/example/ClusteredGroupingExample.java   |   6 +-
 .../jms/example/ClusteredJgroupsExample.java    |   4 +-
 .../activemq/server0/client-jndi.properties     |   2 +-
 .../jms/example/ClusteredQueueExample.java      |   4 +-
 .../jms/example/ClusteredStandaloneExample.java |   6 +-
 .../example/StaticClusteredQueueExample.java    |   2 +-
 .../jms/example/ClusterStaticOnewayExample.java |   2 +-
 .../jms/example/ClusteredTopicExample.java      |   4 +-
 .../ColocatedFailoverScaleDownExample.java      |   8 +-
 .../jms/example/ColocatedFailoverExample.java   |   8 +-
 .../src/main/resources/jndi.properties          |   3 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../activemq/jms/example/DivertExample.java     |   4 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../expiry/src/main/resources/jndi.properties   |   2 +-
 .../jms/example/HAPolicyAutoBackupExample.java  |  12 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../activemq/jms/example/JMSBridgeExample.java  |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../jms/jmx/src/main/resources/jndi.properties  |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   3 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   6 +-
 .../src/main/resources/jndi.properties          |   6 +-
 .../src/main/resources/jndi.properties          |   3 +-
 .../src/main/resources/jndi.properties          |   6 +-
 .../paging/src/main/resources/jndi.properties   |   2 +-
 .../jms/perf/src/main/resources/jndi.properties |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   3 +-
 .../QueueMessageRedistributionExample.java      |   4 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../queue/src/main/resources/jndi.properties    |   2 +-
 .../activemq/jms/example/ReattachExample.java   |   2 +-
 .../src/main/resources/jndi.properties          |   7 +-
 .../src/main/resources/jndi.properties          |   6 +-
 .../src/main/resources/jndi.properties          |   6 +-
 .../src/main/resources/jndi.properties          |   6 +-
 .../src/main/resources/jndi.properties          |   6 +-
 .../src/main/resources/jndi.properties          |   6 +-
 .../activemq/jms/example/ScaleDownExample.java  |  12 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../security/src/main/resources/jndi.properties |   2 +-
 .../src/main/resources/jndi.properties          |   3 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../stomp/src/main/resources/jndi.properties    |   2 +-
 .../stomp1.1/src/main/resources/jndi.properties |   2 +-
 .../stomp1.2/src/main/resources/jndi.properties |   2 +-
 .../src/main/resources/jndi.properties          |   6 +-
 .../jms/example/SymmetricClusterExample.java    |   7 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../topic/src/main/resources/jndi.properties    |   2 +-
 .../src/main/resources/jndi.properties          |   6 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../src/main/resources/jndi.properties          |   2 +-
 .../xa-send/src/main/resources/jndi.properties  |   2 +-
 .../activemq/jms/soak/example/SoakReceiver.java |   2 +-
 .../activemq/jms/soak/example/SoakSender.java   |   2 +-
 .../client/ServerLocatorSerializationTest.java  | 131 ------
 .../integration/client/SessionFactoryTest.java  |  48 +--
 .../BridgeWithDiscoveryGroupStartTest.java      |   8 +-
 .../cluster/distribution/ClusterTestBase.java   |  14 +-
 .../HAClientTopologyWithDiscoveryTest.java      |   8 +-
 .../discovery/DiscoveryBaseTest.java            |  12 +-
 .../discovery/DiscoveryStayAliveTest.java       |  10 +-
 .../integration/discovery/DiscoveryTest.java    |  67 ++-
 .../jms/ActiveMQConnectionFactoryTest.java      |  16 +-
 .../integration/jms/SimpleJNDIClientTest.java   | 383 ++++--------------
 .../ConnectionFactorySerializationTest.java     | 325 +++++++++++++--
 ...tionFactoryWithJGroupsSerializationTest.java |  19 +-
 .../jms/server/JMSServerDeployerTest.java       |  10 +-
 .../management/ActiveMQServerControlTest.java   |   3 +-
 .../management/BroadcastGroupControlTest.java   |  12 +-
 .../ClusterConnectionControl2Test.java          |  14 +-
 .../ClusterConnectionControlTest.java           |   8 +-
 .../ra/ActiveMQRAClusteredTestBase.java         |   5 +-
 .../integration/ra/ResourceAdapterTest.java     |   8 +-
 .../src/test/resources/jndi.properties          |   3 +-
 .../jtests/jms/framework/PTPTestCase.java       |   3 +-
 .../jtests/jms/framework/PubSubTestCase.java    |   3 +-
 .../jtests/jms/framework/UnifiedTestCase.java   |   5 +-
 .../tests/unit/ra/ResourceAdapterTest.java      |   4 +-
 167 files changed, 3112 insertions(+), 2352 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-commons/src/main/java/org/apache/activemq/utils/uri/SchemaConstants.java
----------------------------------------------------------------------
diff --git a/activemq-commons/src/main/java/org/apache/activemq/utils/uri/SchemaConstants.java b/activemq-commons/src/main/java/org/apache/activemq/utils/uri/SchemaConstants.java
new file mode 100644
index 0000000..6e7afe1
--- /dev/null
+++ b/activemq-commons/src/main/java/org/apache/activemq/utils/uri/SchemaConstants.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.utils.uri;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class SchemaConstants
+{
+   public static final String TCP = "tcp";
+
+   public static final String UDP = "udp";
+
+   public static final String JGROUPS = "jgroups";
+
+   public static final String VM = "vm";
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URIFactory.java
----------------------------------------------------------------------
diff --git a/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URIFactory.java b/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URIFactory.java
index 4bc5fa4..452bedd 100644
--- a/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URIFactory.java
+++ b/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URIFactory.java
@@ -18,13 +18,13 @@
 package org.apache.activemq.utils.uri;
 
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @author clebertsuconic
  */
-
 public class URIFactory<T>
 {
 
@@ -48,11 +48,25 @@ public class URIFactory<T>
       schemaFactory.setFactory(this);
    }
 
-   public void removeSchema(final String schemaName)
+   public void removeSchema(final SchemaConstants schemaName)
    {
       schemas.remove(schemaName);
    }
 
+   public T newObject(String uriString) throws Exception
+   {
+      URI uri = normalise(uriString);
+      URISchema<T> schemaFactory = schemas.get(uri.getScheme());
+
+      if (schemaFactory == null)
+      {
+         throw new NullPointerException("Schema " + uri.getScheme() + " not found");
+      }
+
+
+      return schemaFactory.newObject(uri);
+   }
+
    public T newObject(URI uri) throws Exception
    {
       URISchema<T> schemaFactory = schemas.get(uri.getScheme());
@@ -66,5 +80,72 @@ public class URIFactory<T>
       return schemaFactory.newObject(uri);
    }
 
+   public void populateObject(URI uri, T bean) throws Exception
+   {
+      URISchema<T> schemaFactory = schemas.get(uri.getScheme());
 
+      if (schemaFactory == null)
+      {
+         throw new NullPointerException("Schema " + uri.getScheme() + " not found");
+      }
+
+      schemaFactory.populateObject(uri, bean);
+   }
+
+   public URI createSchema(String scheme, T bean) throws Exception
+   {
+      URISchema<T> schemaFactory = schemas.get(scheme);
+
+      if (schemaFactory == null)
+      {
+         throw new NullPointerException("Schema " + scheme + " not found");
+      }
+      return schemaFactory.newURI(bean);
+   }
+
+   /*
+   * this method is used to change a string with multiple URI's in it into a valid URI.
+   * for instance it is possible to have the following String
+   * (tcp://localhost:5445,tcp://localhost:5545,tcp://localhost:5555)?somequery
+   * This is an invalid URI so will be changed so that the first URI is used and the
+   * extra ones added as part of the URI fragment, like so
+   * tcp://localhost:5445?someQuery#tcp://localhost:5545,tcp://localhost:5555.
+   *
+   * It is the job of the URISchema implementation to handle these fragments as needed.
+   * */
+   private URI normalise(String uri) throws URISyntaxException
+   {
+      if (uri.startsWith("("))
+      {
+         String[] split = uri.split("\\)");
+         String[] connectorURIS = split[0].substring(split[0].indexOf('(') + 1).split(",");
+         String factoryQuery = split.length > 1 ? split[1] : "";
+         StringBuilder builder = new StringBuilder(connectorURIS[0]);
+         if (factoryQuery != null && factoryQuery.length() > 0)
+         {
+            if (connectorURIS[0].contains("?"))
+            {
+               builder.append("&").append(factoryQuery.substring(1));
+            }
+            else
+            {
+               builder.append(factoryQuery);
+            }
+         }
+         if (connectorURIS.length > 1)
+         {
+            builder.append("#");
+            for (int i = 1; i < connectorURIS.length; i++)
+            {
+               if (i > 1)
+               {
+                  builder.append(",");
+               }
+               builder.append(connectorURIS[i]);
+            }
+         }
+         return new URI(builder.toString());
+      }
+      return new URI(uri);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URISchema.java
----------------------------------------------------------------------
diff --git a/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URISchema.java b/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URISchema.java
index 4226d71..26ec4cd 100644
--- a/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URISchema.java
+++ b/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URISchema.java
@@ -14,15 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.activemq.utils.uri;
 
+import java.beans.PropertyDescriptor;
 import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URLDecoder;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.beanutils.BeanUtilsBean;
 import org.apache.commons.beanutils.FluentPropertyBeanIntrospector;
@@ -40,6 +42,16 @@ public abstract class URISchema<T>
       return newObject(uri, null);
    }
 
+   public void populateObject(URI uri, T bean) throws Exception
+   {
+      setData(uri, bean, parseQuery(uri.getQuery(), null));
+   }
+
+   public URI newURI(T bean) throws Exception
+   {
+      return internalNewURI(bean);
+   }
+
    private URIFactory<T> parentFactory;
 
 
@@ -102,6 +114,8 @@ public abstract class URISchema<T>
 
    protected abstract T internalNewObject(URI uri, Map<String, String> query) throws Exception;
 
+   protected abstract URI internalNewURI(T bean) throws Exception;
+
    private static final BeanUtilsBean beanUtils = new BeanUtilsBean();
 
 
@@ -185,4 +199,74 @@ public abstract class URISchema<T>
       }
       return obj;
    }
+
+   public static void setData(URI uri, HashMap<String, Object> properties, Set<String> allowableProperties, Map<String, String> query)
+   {
+      if (allowableProperties.contains("host"))
+      {
+         properties.put("host", uri.getHost());
+      }
+      if (allowableProperties.contains("port"))
+      {
+         properties.put("port", uri.getPort());
+      }
+      if (allowableProperties.contains("userInfo"))
+      {
+         properties.put("userInfo", uri.getUserInfo());
+      }
+      for (Map.Entry<String, String> entry : query.entrySet())
+      {
+         if (allowableProperties.contains(entry.getKey()))
+         {
+            properties.put(entry.getKey(), entry.getValue());
+         }
+      }
+   }
+
+   public static String getData(List<String> ignored, Object... beans) throws Exception
+   {
+      StringBuilder sb = new StringBuilder();
+      synchronized (beanUtils)
+      {
+         for (Object bean : beans)
+         {
+            if (bean != null)
+            {
+               PropertyDescriptor[] descriptors = beanUtils.getPropertyUtils().getPropertyDescriptors(bean);
+               for (PropertyDescriptor descriptor : descriptors)
+               {
+                  if (descriptor.getReadMethod() != null && descriptor.getWriteMethod() != null && isWriteable(descriptor, ignored))
+                  {
+                     String value = beanUtils.getProperty(bean, descriptor.getName());
+                     if (value != null)
+                     {
+                        sb.append("&").append(descriptor.getName()).append("=").append(value);
+                     }
+                  }
+               }
+            }
+         }
+      }
+      return sb.toString();
+   }
+
+   private static boolean isWriteable(PropertyDescriptor descriptor, List<String> ignored)
+   {
+      if (ignored != null && ignored.contains(descriptor.getName()))
+      {
+         return false;
+      }
+      Class<?> type = descriptor.getPropertyType();
+      return (type == Double.class) ||
+             (type == double.class) ||
+             (type == Long.class) ||
+             (type == long.class) ||
+             (type == Integer.class) ||
+             (type == int.class) ||
+             (type == Float.class) ||
+             (type == float.class) ||
+             (type == Boolean.class) ||
+             (type == boolean.class) ||
+             (type == String.class);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-commons/src/test/java/org/apache/activemq/utils/URIParserTest.java
----------------------------------------------------------------------
diff --git a/activemq-commons/src/test/java/org/apache/activemq/utils/URIParserTest.java b/activemq-commons/src/test/java/org/apache/activemq/utils/URIParserTest.java
index be389c6..1654ec4 100644
--- a/activemq-commons/src/test/java/org/apache/activemq/utils/URIParserTest.java
+++ b/activemq-commons/src/test/java/org/apache/activemq/utils/URIParserTest.java
@@ -109,6 +109,12 @@ public class URIParserTest
       {
          return setData(uri, new Fruit(getSchemaName()), query);
       }
+
+      @Override
+      protected URI internalNewURI(FruitBase bean)
+      {
+         return null;
+      }
    }
 
    class FruitBaseSchema extends URISchema<FruitBase>
@@ -125,6 +131,12 @@ public class URIParserTest
       {
          return setData(uri, new FruitBase(getSchemaName()), query);
       }
+
+      @Override
+      protected URI internalNewURI(FruitBase bean)
+      {
+         return null;
+      }
    }
 
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastEndpointFactory.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastEndpointFactory.java
index 2091c84..8fbe217 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastEndpointFactory.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastEndpointFactory.java
@@ -16,8 +16,8 @@
  */
 package org.apache.activemq.api.core;
 
-import java.io.Serializable;
 
+import java.io.Serializable;
 
 public interface BroadcastEndpointFactory extends Serializable
 {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastEndpointFactoryConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastEndpointFactoryConfiguration.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastEndpointFactoryConfiguration.java
deleted file mode 100644
index c600c88..0000000
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastEndpointFactoryConfiguration.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.api.core;
-
-import java.io.Serializable;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- *         9/25/12
- */
-public interface BroadcastEndpointFactoryConfiguration extends Serializable
-{
-   BroadcastEndpointFactory createBroadcastEndpointFactory();
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastGroupConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastGroupConfiguration.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastGroupConfiguration.java
index 6714830..a27b543 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastGroupConfiguration.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastGroupConfiguration.java
@@ -38,7 +38,7 @@ public final class BroadcastGroupConfiguration implements Serializable
 
    private long broadcastPeriod = ActiveMQDefaultConfiguration.getDefaultBroadcastPeriod();
 
-   private BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration = null;
+   private BroadcastEndpointFactory endpointFactory = null;
 
    private List<String> connectorInfos = null;
 
@@ -79,14 +79,14 @@ public final class BroadcastGroupConfiguration implements Serializable
       return this;
    }
 
-   public BroadcastEndpointFactoryConfiguration getEndpointFactoryConfiguration()
+   public BroadcastEndpointFactory getEndpointFactory()
    {
-      return endpointFactoryConfiguration;
+      return endpointFactory;
    }
 
-   public BroadcastGroupConfiguration setEndpointFactoryConfiguration(BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration)
+   public BroadcastGroupConfiguration setEndpointFactory(BroadcastEndpointFactory endpointFactory)
    {
-      this.endpointFactoryConfiguration = endpointFactoryConfiguration;
+      this.endpointFactory = endpointFactory;
       return this;
    }
 
@@ -97,7 +97,7 @@ public final class BroadcastGroupConfiguration implements Serializable
       int result = 1;
       result = prime * result + (int)(broadcastPeriod ^ (broadcastPeriod >>> 32));
       result = prime * result + ((connectorInfos == null) ? 0 : connectorInfos.hashCode());
-      result = prime * result + ((endpointFactoryConfiguration == null) ? 0 : endpointFactoryConfiguration.hashCode());
+      result = prime * result + ((endpointFactory == null) ? 0 : endpointFactory.hashCode());
       result = prime * result + ((name == null) ? 0 : name.hashCode());
       return result;
    }
@@ -121,12 +121,12 @@ public final class BroadcastGroupConfiguration implements Serializable
       }
       else if (!connectorInfos.equals(other.connectorInfos))
          return false;
-      if (endpointFactoryConfiguration == null)
+      if (endpointFactory == null)
       {
-         if (other.endpointFactoryConfiguration != null)
+         if (other.endpointFactory != null)
             return false;
       }
-      else if (!endpointFactoryConfiguration.equals(other.endpointFactoryConfiguration))
+      else if (!endpointFactory.equals(other.endpointFactory))
          return false;
       if (name == null)
       {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/ChannelBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/ChannelBroadcastEndpointFactory.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/ChannelBroadcastEndpointFactory.java
new file mode 100644
index 0000000..ce77d6f
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/ChannelBroadcastEndpointFactory.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.api.core;
+
+import org.jgroups.JChannel;
+
+/**
+* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+*/
+public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory
+{
+   private final JChannel channel;
+
+   private final String channelName;
+
+   public ChannelBroadcastEndpointFactory(JChannel channel, String channelName)
+   {
+      this.channel = channel;
+      this.channelName = channelName;
+   }
+
+   @Override
+   public BroadcastEndpoint createBroadcastEndpoint() throws Exception
+   {
+      return new JGroupsChannelBroadcastEndpoint(channel, channelName).initChannel();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/DiscoveryGroupConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/DiscoveryGroupConfiguration.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/DiscoveryGroupConfiguration.java
index 5789cc6..cbe0d7a 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/DiscoveryGroupConfiguration.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/DiscoveryGroupConfiguration.java
@@ -16,9 +16,6 @@
  */
 package org.apache.activemq.api.core;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
 import org.apache.activemq.api.core.client.ActiveMQClient;
@@ -48,29 +45,9 @@ public final class DiscoveryGroupConfiguration implements Serializable
    private long discoveryInitialWaitTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
 
    /*
-   * The localBindAddress is needed so we can be backward compatible with 2.2 clients
-   * */
-   private transient String localBindAddress = null;
-
-   /*
-   * The localBindPort is needed so we can be backward compatible with 2.2 clients
-   * */
-   private transient int localBindPort = -1;
-
-   /*
-   * The groupAddress is needed so we can be backward compatible with 2.2 clients
-   * */
-   private String groupAddress = null;
-
-   /*
-   * The groupPort is needed so we can be backward compatible with 2.2 clients
-   * */
-   private int groupPort = -1;
-
-   /*
    * This is the actual object used by the class, it has to be transient so we can handle deserialization with a 2.2 client
    * */
-   private transient BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration;
+   private BroadcastEndpointFactory endpointFactory;
 
    public DiscoveryGroupConfiguration()
    {
@@ -121,51 +98,17 @@ public final class DiscoveryGroupConfiguration implements Serializable
       return this;
    }
 
-   public BroadcastEndpointFactoryConfiguration getBroadcastEndpointFactoryConfiguration()
+   public BroadcastEndpointFactory getBroadcastEndpointFactory()
    {
-      return endpointFactoryConfiguration;
+      return endpointFactory;
    }
 
-   public DiscoveryGroupConfiguration setBroadcastEndpointFactoryConfiguration(BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration)
+   public DiscoveryGroupConfiguration setBroadcastEndpointFactory(BroadcastEndpointFactory endpointFactory)
    {
-      this.endpointFactoryConfiguration = endpointFactoryConfiguration;
-      if (endpointFactoryConfiguration instanceof DiscoveryGroupConfigurationCompatibilityHelper)
-      {
-         DiscoveryGroupConfigurationCompatibilityHelper dgcch = (DiscoveryGroupConfigurationCompatibilityHelper) endpointFactoryConfiguration;
-         localBindAddress = dgcch.getLocalBindAddress();
-         localBindPort = dgcch.getLocalBindPort();
-         groupAddress = dgcch.getGroupAddress();
-         groupPort = dgcch.getGroupPort();
-      }
+      this.endpointFactory = endpointFactory;
       return this;
    }
 
-   private void writeObject(ObjectOutputStream out) throws IOException
-   {
-      out.defaultWriteObject();
-      if (groupPort < 0)
-      {
-         out.writeObject(endpointFactoryConfiguration);
-      }
-   }
-
-   private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException
-   {
-      in.defaultReadObject();
-      if (groupPort < 0)
-      {
-         endpointFactoryConfiguration = (BroadcastEndpointFactoryConfiguration) in.readObject();
-      }
-      else
-      {
-         endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration()
-            .setGroupAddress(groupAddress)
-            .setGroupPort(groupPort)
-            .setLocalBindAddress(localBindAddress)
-            .setLocalBindPort(localBindPort);
-      }
-   }
-
    @Override
    public boolean equals(Object o)
    {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/DiscoveryGroupConfigurationCompatibilityHelper.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/DiscoveryGroupConfigurationCompatibilityHelper.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/DiscoveryGroupConfigurationCompatibilityHelper.java
deleted file mode 100644
index 4fd1bca..0000000
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/DiscoveryGroupConfigurationCompatibilityHelper.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.api.core;
-
-/**
- * This interface is needed for making a DiscoveryGroupConfiguration backward
- * compatible with version 2.2 clients. It is used to extract from new
- * {@link org.apache.activemq.api.core.BroadcastEndpointFactoryConfiguration} the four
- * UDP attributes in order to form a version 2.2 DiscoveryGroupConfiguration
- * in time of serialization.
- *
- * @see DiscoveryGroupConfiguration#readObject(java.io.ObjectInputStream)
- * @see DiscoveryGroupConfiguration#writeObject(java.io.ObjectOutputStream)
- *
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- *         12/13/12
- */
-public interface DiscoveryGroupConfigurationCompatibilityHelper
-{
-// XXX No javadocs
-   String getLocalBindAddress();
-
-// XXX No javadocs
-   int getLocalBindPort();
-
-// XXX No javadocs
-   String getGroupAddress();
-
-// XXX No javadocs
-   int getGroupPort();
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsBroadcastEndpoint.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsBroadcastEndpoint.java
new file mode 100644
index 0000000..07381ed
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsBroadcastEndpoint.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.api.core;
+
+import org.jgroups.JChannel;
+import org.jgroups.ReceiverAdapter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is the implementation of ActiveMQ members discovery that will use JGroups.
+ *
+ * @author Howard Gao
+ */
+public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint
+{
+   private final String channelName;
+
+   private boolean clientOpened;
+
+   private boolean broadcastOpened;
+
+   private JChannelWrapper channel;
+
+   private JGroupsReceiver receiver;
+
+   public JGroupsBroadcastEndpoint(String channelName)
+   {
+      this.channelName = channelName;
+   }
+
+   public void broadcast(final byte[] data) throws Exception
+   {
+      if (broadcastOpened)
+      {
+         org.jgroups.Message msg = new org.jgroups.Message();
+
+         msg.setBuffer(data);
+
+         channel.send(msg);
+      }
+   }
+
+   public byte[] receiveBroadcast() throws Exception
+   {
+      if (clientOpened)
+      {
+         return receiver.receiveBroadcast();
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception
+   {
+      if (clientOpened)
+      {
+         return receiver.receiveBroadcast(time, unit);
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   public synchronized void openClient() throws Exception
+   {
+      if (clientOpened)
+      {
+         return;
+      }
+      internalOpen();
+      receiver = new JGroupsReceiver();
+      channel.setReceiver(receiver);
+      clientOpened = true;
+   }
+
+   public synchronized void openBroadcaster() throws Exception
+   {
+      if (broadcastOpened) return;
+      internalOpen();
+      broadcastOpened = true;
+   }
+
+   public abstract JChannel createChannel() throws Exception;
+
+   public JGroupsBroadcastEndpoint initChannel() throws Exception
+   {
+      this.channel = JChannelManager.getJChannel(channelName, this);
+      return this;
+   }
+
+   protected void internalOpen() throws Exception
+   {
+      channel.connect();
+   }
+
+   public synchronized void close(boolean isBroadcast) throws Exception
+   {
+      if (isBroadcast)
+      {
+         broadcastOpened = false;
+      }
+      else
+      {
+         channel.removeReceiver(receiver);
+         clientOpened = false;
+      }
+      channel.close();
+   }
+
+   /**
+    * This class is used to receive messages from a JGroups channel.
+    * Incoming messages are put into a queue.
+    */
+   private static final class JGroupsReceiver extends ReceiverAdapter
+   {
+      private final BlockingQueue<byte[]> dequeue = new LinkedBlockingDeque<byte[]>();
+
+      @Override
+      public void receive(org.jgroups.Message msg)
+      {
+         dequeue.add(msg.getBuffer());
+      }
+
+      public byte[] receiveBroadcast() throws Exception
+      {
+         return dequeue.take();
+      }
+
+      public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception
+      {
+         return dequeue.poll(time, unit);
+      }
+   }
+
+   /**
+    * This class wraps a JChannel with a reference counter. The reference counter
+    * controls the life of the JChannel. When reference count is zero, the channel
+    * will be disconnected.
+    */
+   protected static class JChannelWrapper
+   {
+      int refCount = 1;
+      JChannel channel;
+      String channelName;
+      List<JGroupsReceiver> receivers = new ArrayList<JGroupsReceiver>();
+
+      public JChannelWrapper(String channelName, JChannel channel) throws Exception
+      {
+         this.refCount = 1;
+         this.channelName = channelName;
+         this.channel = channel;
+      }
+
+      public synchronized void close()
+      {
+         refCount--;
+         if (refCount == 0)
+         {
+            JChannelManager.closeChannel(this.channelName, channel);
+         }
+      }
+
+      public void removeReceiver(JGroupsReceiver receiver)
+      {
+         synchronized (receivers)
+         {
+            receivers.remove(receiver);
+         }
+      }
+
+      public synchronized void connect() throws Exception
+      {
+         if (channel.isConnected()) return;
+         channel.setReceiver(new ReceiverAdapter()
+         {
+
+            @Override
+            public void receive(org.jgroups.Message msg)
+            {
+               synchronized (receivers)
+               {
+                  for (JGroupsReceiver r : receivers)
+                  {
+                     r.receive(msg);
+                  }
+               }
+            }
+         });
+         channel.connect(channelName);
+      }
+
+      public void setReceiver(JGroupsReceiver jGroupsReceiver)
+      {
+         synchronized (receivers)
+         {
+            receivers.add(jGroupsReceiver);
+         }
+      }
+
+      public void send(org.jgroups.Message msg) throws Exception
+      {
+         channel.send(msg);
+      }
+
+      public JChannelWrapper addRef()
+      {
+         this.refCount++;
+         return this;
+      }
+
+      @Override
+      public String toString()
+      {
+         return "JChannelWrapper of [" + channel + "] " + refCount + " " + channelName;
+      }
+   }
+
+   /**
+    * This class maintain a global Map of JChannels wrapped in JChannelWrapper for
+    * the purpose of reference counting.
+    * <p/>
+    * Wherever a JChannel is needed it should only get it by calling the getChannel()
+    * method of this class. The real disconnect of channels are also done here only.
+    */
+   protected static class JChannelManager
+   {
+      private static Map<String, JChannelWrapper> channels;
+
+      public static synchronized JChannelWrapper getJChannel(String channelName, JGroupsBroadcastEndpoint endpoint) throws Exception
+      {
+         if (channels == null)
+         {
+            channels = new HashMap<>();
+         }
+         JChannelWrapper wrapper = channels.get(channelName);
+         if (wrapper == null)
+         {
+            wrapper = new JChannelWrapper(channelName, endpoint.createChannel());
+            channels.put(channelName, wrapper);
+            return wrapper;
+         }
+         return wrapper.addRef();
+      }
+
+      public static synchronized void closeChannel(String channelName, JChannel channel)
+      {
+         channel.setReceiver(null);
+         channel.disconnect();
+         channel.close();
+         JChannelWrapper wrapper = channels.remove(channelName);
+         if (wrapper == null)
+         {
+            throw new IllegalStateException("Did not find channel " + channelName);
+         }
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsBroadcastGroupConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsBroadcastGroupConfiguration.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsBroadcastGroupConfiguration.java
deleted file mode 100644
index 9d2a0ac..0000000
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsBroadcastGroupConfiguration.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.api.core;
-
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-
-import org.jgroups.JChannel;
-import org.jgroups.Message;
-import org.jgroups.ReceiverAdapter;
-import org.jgroups.conf.PlainConfigurator;
-
-/**
- * The configuration for creating broadcasting/discovery groups using JGroups channels
- * There are two ways to constructing a JGroups channel (JChannel):
- * <ol>
- * <li> by passing in a JGroups configuration file<br>
- * The file must exists in the activemq classpath. ActiveMQ creates a JChannel with the
- * configuration file and use it for broadcasting and discovery. In standalone server
- * mode ActiveMQ uses this way for constructing JChannels.</li>
- * <li> by passing in a JChannel instance<br>
- * This is useful when ActiveMQ needs to get a JChannel from a running JGroups service as in the
- * case of AS7 integration.</li>
- * </ol>
- * <p>
- * Note only one JChannel is needed in a VM. To avoid the channel being prematurely disconnected
- * by any party, a wrapper class is used.
- *
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
- * @see JChannelWrapper, JChannelManager
- */
-public final class JGroupsBroadcastGroupConfiguration implements BroadcastEndpointFactoryConfiguration, DiscoveryGroupConfigurationCompatibilityHelper
-{
-   private static final long serialVersionUID = 8952238567248461285L;
-
-   private final BroadcastEndpointFactory factory;
-
-   public JGroupsBroadcastGroupConfiguration(final String jgroupsFile, final String channelName)
-   {
-      factory = new BroadcastEndpointFactory()
-      {
-         private static final long serialVersionUID = 1047956472941098435L;
-
-         @Override
-         public BroadcastEndpoint createBroadcastEndpoint() throws Exception
-         {
-            JGroupsBroadcastEndpoint endpoint = new JGroupsBroadcastEndpoint();
-            endpoint.initChannel(jgroupsFile, channelName);
-            return endpoint;
-         }
-      };
-   }
-
-   public JGroupsBroadcastGroupConfiguration(final JChannel channel, final String channelName)
-   {
-      factory = new BroadcastEndpointFactory()
-      {
-         private static final long serialVersionUID = 5110372849181145377L;
-
-         @Override
-         public BroadcastEndpoint createBroadcastEndpoint() throws Exception
-         {
-            JGroupsBroadcastEndpoint endpoint = new JGroupsBroadcastEndpoint();
-            endpoint.initChannel(channel, channelName);
-            return endpoint;
-         }
-      };
-   }
-
-   @Override
-   public BroadcastEndpointFactory createBroadcastEndpointFactory()
-   {
-      return factory;
-   }
-
-   @Override
-   public String getLocalBindAddress()
-   {
-      return null;
-   }
-
-   @Override
-   /*
-   * return -1 to force deserialization of object
-   * */
-   public int getLocalBindPort()
-   {
-      return -1;
-   }
-
-   @Override
-   public String getGroupAddress()
-   {
-      return null;
-   }
-
-   @Override
-   public int getGroupPort()
-   {
-      return -1;
-   }
-
-   /**
-    * This class is the implementation of ActiveMQ members discovery that will use JGroups.
-    *
-    * @author Howard Gao
-    */
-   private static final class JGroupsBroadcastEndpoint implements BroadcastEndpoint
-   {
-      private boolean clientOpened;
-
-      private boolean broadcastOpened;
-
-      private JChannelWrapper<?> channel;
-
-      private JGroupsReceiver receiver;
-
-      public void broadcast(final byte[] data) throws Exception
-      {
-         if (broadcastOpened)
-         {
-            Message msg = new Message();
-
-            msg.setBuffer(data);
-
-            channel.send(msg);
-         }
-      }
-
-      public byte[] receiveBroadcast() throws Exception
-      {
-         if (clientOpened)
-         {
-            return receiver.receiveBroadcast();
-         }
-         else
-         {
-            return null;
-         }
-      }
-
-      public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception
-      {
-         if (clientOpened)
-         {
-            return receiver.receiveBroadcast(time, unit);
-         }
-         else
-         {
-            return null;
-         }
-      }
-
-      public synchronized void openClient() throws Exception
-      {
-         if (clientOpened)
-         {
-            return;
-         }
-         internalOpen();
-         receiver = new JGroupsReceiver();
-         channel.setReceiver(receiver);
-         clientOpened = true;
-      }
-
-      public synchronized void openBroadcaster() throws Exception
-      {
-         if (broadcastOpened) return;
-         internalOpen();
-         broadcastOpened = true;
-      }
-
-      private void initChannel(final String jgroupsConfig, final String channelName) throws Exception
-      {
-         PlainConfigurator configurator = new PlainConfigurator(jgroupsConfig);
-         try
-         {
-            this.channel = JChannelManager.getJChannel(channelName, configurator);
-            return;
-         }
-         catch (Exception e)
-         {
-            this.channel = null;
-         }
-         URL configURL = Thread.currentThread().getContextClassLoader().getResource(jgroupsConfig);
-
-         if (configURL == null)
-         {
-            throw new RuntimeException("couldn't find JGroups configuration " + jgroupsConfig);
-         }
-         this.channel = JChannelManager.getJChannel(channelName, configURL);
-      }
-
-      private void initChannel(final JChannel channel1, final String channelName) throws Exception
-      {
-         this.channel = JChannelManager.getJChannel(channelName, channel1);
-      }
-
-      protected void internalOpen() throws Exception
-      {
-         channel.connect();
-      }
-
-      public synchronized void close(boolean isBroadcast) throws Exception
-      {
-         if (isBroadcast)
-         {
-            broadcastOpened = false;
-         }
-         else
-         {
-            channel.removeReceiver(receiver);
-            clientOpened = false;
-         }
-         channel.close();
-      }
-
-      /**
-       * This class is used to receive messages from a JGroups channel.
-       * Incoming messages are put into a queue.
-       */
-      private static final class JGroupsReceiver extends ReceiverAdapter
-      {
-         private final BlockingQueue<byte[]> dequeue = new LinkedBlockingDeque<byte[]>();
-
-         @Override
-         public void receive(org.jgroups.Message msg)
-         {
-            dequeue.add(msg.getBuffer());
-         }
-
-         public byte[] receiveBroadcast() throws Exception
-         {
-            return dequeue.take();
-         }
-
-         public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception
-         {
-            return dequeue.poll(time, unit);
-         }
-      }
-
-      /**
-       * This class wraps a JChannel with a reference counter. The reference counter
-       * controls the life of the JChannel. When reference count is zero, the channel
-       * will be disconnected.
-       *
-       * @param <T>
-       */
-      private static class JChannelWrapper<T>
-      {
-         int refCount = 1;
-         JChannel channel;
-         String channelName;
-         List<JGroupsReceiver> receivers = new ArrayList<JGroupsReceiver>();
-
-         public JChannelWrapper(String channelName, T t) throws Exception
-         {
-            this.refCount = 1;
-            this.channelName = channelName;
-            if (t instanceof URL)
-            {
-               this.channel = new JChannel((URL) t);
-            }
-            else if (t instanceof JChannel)
-            {
-               this.channel = (JChannel) t;
-            }
-            else if (t instanceof PlainConfigurator)
-            {
-               this.channel = new JChannel((PlainConfigurator)t);
-            }
-            else
-            {
-               throw new IllegalArgumentException("Unsupported type " + t);
-            }
-         }
-
-         public synchronized void close()
-         {
-            refCount--;
-            if (refCount == 0)
-            {
-               JChannelManager.closeChannel(this.channelName, channel);
-            }
-         }
-
-         public void removeReceiver(JGroupsReceiver receiver)
-         {
-            synchronized (receivers)
-            {
-               receivers.remove(receiver);
-            }
-         }
-
-         public synchronized void connect() throws Exception
-         {
-            if (channel.isConnected()) return;
-            channel.setReceiver(new ReceiverAdapter()
-            {
-
-               @Override
-               public void receive(Message msg)
-               {
-                  synchronized (receivers)
-                  {
-                     for (JGroupsReceiver r : receivers)
-                     {
-                        r.receive(msg);
-                     }
-                  }
-               }
-            });
-            channel.connect(channelName);
-         }
-
-         public void setReceiver(JGroupsReceiver jGroupsReceiver)
-         {
-            synchronized (receivers)
-            {
-               receivers.add(jGroupsReceiver);
-            }
-         }
-
-         public void send(Message msg) throws Exception
-         {
-            channel.send(msg);
-         }
-
-         public JChannelWrapper<T> addRef()
-         {
-            this.refCount++;
-            return this;
-         }
-
-         @Override
-         public String toString()
-         {
-            return "JChannelWrapper of [" + channel + "] " + refCount + " " + channelName;
-         }
-      }
-
-      /**
-       * This class maintain a global Map of JChannels wrapped in JChannelWrapper for
-       * the purpose of reference counting.
-       * <p/>
-       * Wherever a JChannel is needed it should only get it by calling the getChannel()
-       * method of this class. The real disconnect of channels are also done here only.
-       */
-      private static class JChannelManager
-      {
-         private static Map<String, JChannelWrapper<?>> channels;
-
-         public static synchronized <T> JChannelWrapper<?> getJChannel(String channelName, T t) throws Exception
-         {
-            if (channels == null)
-            {
-               channels = new HashMap<String, JChannelWrapper<?>>();
-            }
-            JChannelWrapper<?> wrapper = channels.get(channelName);
-            if (wrapper == null)
-            {
-               wrapper = new JChannelWrapper<T>(channelName, t);
-               channels.put(channelName, wrapper);
-               return wrapper;
-            }
-            return wrapper.addRef();
-         }
-
-         public static synchronized void closeChannel(String channelName, JChannel channel)
-         {
-            channel.setReceiver(null);
-            channel.disconnect();
-            channel.close();
-            JChannelWrapper<?> wrapper = channels.remove(channelName);
-            if (wrapper == null)
-            {
-               throw new IllegalStateException("Did not find channel " + channelName);
-            }
-         }
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsChannelBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsChannelBroadcastEndpoint.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsChannelBroadcastEndpoint.java
new file mode 100644
index 0000000..e14d0f0
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsChannelBroadcastEndpoint.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.api.core;
+
+import org.jgroups.JChannel;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class JGroupsChannelBroadcastEndpoint extends JGroupsBroadcastEndpoint
+{
+   private final JChannel jChannel;
+
+   public JGroupsChannelBroadcastEndpoint(JChannel jChannel, final String channelName) throws Exception
+   {
+      super(channelName);
+      this.jChannel = jChannel;
+   }
+
+   @Override
+   public JChannel createChannel() throws Exception
+   {
+      return jChannel;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsFileBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsFileBroadcastEndpoint.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsFileBroadcastEndpoint.java
new file mode 100644
index 0000000..6f0b8ad
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsFileBroadcastEndpoint.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.api.core;
+
+import org.jgroups.JChannel;
+
+import java.net.URL;
+
+/**
+ * This class is the implementation of ActiveMQ members discovery that will use JGroups.
+ *
+ * @author Howard Gao
+ */
+public final class JGroupsFileBroadcastEndpoint extends JGroupsBroadcastEndpoint
+{
+   private String file;
+
+   public JGroupsFileBroadcastEndpoint(final String file, final String channelName) throws Exception
+   {
+      super(channelName);
+      this.file = file;
+   }
+
+   public JChannel createChannel() throws Exception
+   {
+      URL configURL = Thread.currentThread().getContextClassLoader().getResource(file);
+
+      if (configURL == null)
+      {
+         throw new RuntimeException("couldn't find JGroups configuration " + file);
+      }
+
+      return new JChannel(configURL);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsFileBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsFileBroadcastEndpointFactory.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsFileBroadcastEndpointFactory.java
new file mode 100644
index 0000000..21bef7a
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsFileBroadcastEndpointFactory.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.api.core;
+
+/**
+* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+*/
+public class JGroupsFileBroadcastEndpointFactory implements BroadcastEndpointFactory
+{
+   private String file;
+
+   private String channelName;
+
+   @Override
+   public BroadcastEndpoint createBroadcastEndpoint() throws Exception
+   {
+      return new JGroupsFileBroadcastEndpoint(file, channelName).initChannel();
+   }
+
+   public String getFile()
+   {
+      return file;
+   }
+
+   public JGroupsFileBroadcastEndpointFactory setFile(String file)
+   {
+      this.file = file;
+      return this;
+   }
+
+   public String getChannelName()
+   {
+      return channelName;
+   }
+
+   public JGroupsFileBroadcastEndpointFactory setChannelName(String channelName)
+   {
+      this.channelName = channelName;
+      return this;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsPropertiesBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsPropertiesBroadcastEndpoint.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsPropertiesBroadcastEndpoint.java
new file mode 100644
index 0000000..ddc5c19
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsPropertiesBroadcastEndpoint.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.api.core;
+
+import org.jgroups.JChannel;
+import org.jgroups.conf.PlainConfigurator;
+
+/**
+ * This class is the implementation of ActiveMQ members discovery that will use JGroups.
+ *
+ * @author Howard Gao
+ */
+public final class JGroupsPropertiesBroadcastEndpoint extends JGroupsBroadcastEndpoint
+{
+   private String properties;
+
+   public JGroupsPropertiesBroadcastEndpoint(final String properties, final String channelName) throws Exception
+   {
+      super(channelName);
+      this.properties = properties;
+   }
+
+   @Override
+   public JChannel createChannel() throws Exception
+   {
+      PlainConfigurator configurator = new PlainConfigurator(properties);
+      return new JChannel(configurator);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsPropertiesBroadcastEndpointFactory.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
new file mode 100644
index 0000000..b9d06b9
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.api.core;
+
+/**
+* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+*/
+public class JGroupsPropertiesBroadcastEndpointFactory implements BroadcastEndpointFactory
+{
+   private String properties;
+
+   private String channelName;
+
+   @Override
+   public BroadcastEndpoint createBroadcastEndpoint() throws Exception
+   {
+      return new JGroupsPropertiesBroadcastEndpoint(properties, channelName).initChannel();
+   }
+
+   public String getProperties()
+   {
+      return properties;
+   }
+
+   public JGroupsPropertiesBroadcastEndpointFactory setProperties(String properties)
+   {
+      this.properties = properties;
+      return this;
+   }
+
+   public String getChannelName()
+   {
+      return channelName;
+   }
+
+   public JGroupsPropertiesBroadcastEndpointFactory setChannelName(String channelName)
+   {
+      this.channelName = channelName;
+      return this;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/UDPBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/UDPBroadcastEndpointFactory.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/UDPBroadcastEndpointFactory.java
new file mode 100644
index 0000000..b2f6095
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/UDPBroadcastEndpointFactory.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.api.core;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.core.client.ActiveMQClientLogger;
+
+
+/**
+ * The configuration used to determine how the server will broadcast members.
+ * <p>
+ * This is analogous to {@link org.apache.activemq.api.core.DiscoveryGroupConfiguration}
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a> Created 18 Nov 2008 08:44:30
+ */
+public final class UDPBroadcastEndpointFactory implements BroadcastEndpointFactory
+{
+   private transient String localBindAddress = null;
+
+   private transient int localBindPort = -1;
+
+   private String groupAddress = null;
+
+   private int groupPort = -1;
+
+   public UDPBroadcastEndpointFactory()
+   {
+   }
+
+   public BroadcastEndpoint createBroadcastEndpoint() throws Exception
+   {
+      return new UDPBroadcastEndpoint()
+         .setGroupAddress(groupAddress != null ? InetAddress.getByName(groupAddress) : null)
+         .setGroupPort(groupPort)
+         .setLocalBindAddress(localBindAddress != null ? InetAddress.getByName(localBindAddress) : null)
+         .setLocalBindPort(localBindPort);
+   }
+
+   public String getGroupAddress()
+   {
+      return groupAddress;
+   }
+
+   public UDPBroadcastEndpointFactory setGroupAddress(String groupAddress)
+   {
+      this.groupAddress = groupAddress;
+      return this;
+   }
+
+   public int getGroupPort()
+   {
+      return groupPort;
+   }
+
+   public UDPBroadcastEndpointFactory setGroupPort(int groupPort)
+   {
+      this.groupPort = groupPort;
+      return this;
+   }
+
+   public int getLocalBindPort()
+   {
+      return localBindPort;
+   }
+
+   public UDPBroadcastEndpointFactory setLocalBindPort(int localBindPort)
+   {
+      this.localBindPort = localBindPort;
+      return this;
+   }
+
+   public String getLocalBindAddress()
+   {
+      return localBindAddress;
+   }
+
+   public UDPBroadcastEndpointFactory setLocalBindAddress(String localBindAddress)
+   {
+      this.localBindAddress = localBindAddress;
+      return this;
+   }
+
+   /**
+    * <p> This is the member discovery implementation using direct UDP. It was extracted as a refactoring from
+    * {@link org.apache.activemq.core.cluster.DiscoveryGroup}</p>
+    *
+    * @author Tomohisa
+    * @author Howard Gao
+    * @author Clebert Suconic
+    */
+   private static class UDPBroadcastEndpoint implements BroadcastEndpoint
+   {
+      private static final int SOCKET_TIMEOUT = 500;
+
+      private InetAddress localAddress;
+
+      private int localBindPort;
+
+      private InetAddress groupAddress;
+
+      private int groupPort;
+
+      private DatagramSocket broadcastingSocket;
+
+      private MulticastSocket receivingSocket;
+
+      private volatile boolean open;
+
+      public UDPBroadcastEndpoint()
+      {
+      }
+
+      public UDPBroadcastEndpoint setGroupAddress(InetAddress groupAddress)
+      {
+         this.groupAddress = groupAddress;
+         return this;
+      }
+
+      public UDPBroadcastEndpoint setGroupPort(int groupPort)
+      {
+         this.groupPort = groupPort;
+         return this;
+      }
+
+      public UDPBroadcastEndpoint setLocalBindAddress(InetAddress localAddress)
+      {
+         this.localAddress = localAddress;
+         return this;
+      }
+
+      public UDPBroadcastEndpoint setLocalBindPort(int localBindPort)
+      {
+         this.localBindPort = localBindPort;
+         return this;
+      }
+
+
+      public void broadcast(byte[] data) throws Exception
+      {
+         DatagramPacket packet = new DatagramPacket(data, data.length, groupAddress, groupPort);
+         broadcastingSocket.send(packet);
+      }
+
+      public byte[] receiveBroadcast() throws Exception
+      {
+         final byte[] data = new byte[65535];
+         final DatagramPacket packet = new DatagramPacket(data, data.length);
+
+         while (open)
+         {
+            try
+            {
+               receivingSocket.receive(packet);
+            }
+            // TODO: Do we need this?
+            catch (InterruptedIOException e)
+            {
+               continue;
+            }
+            catch (IOException e)
+            {
+               if (open)
+               {
+                  ActiveMQClientLogger.LOGGER.warn(this + " getting exception when receiving broadcasting.", e);
+               }
+            }
+            break;
+         }
+         return data;
+      }
+
+      public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception
+      {
+         // We just use the regular method on UDP, there's no timeout support
+         // and this is basically for tests only
+         return receiveBroadcast();
+      }
+
+      public void openBroadcaster() throws Exception
+      {
+         if (localBindPort != -1)
+         {
+            broadcastingSocket = new DatagramSocket(localBindPort, localAddress);
+         }
+         else
+         {
+            if (localAddress != null)
+            {
+               ActiveMQClientLogger.LOGGER.broadcastGroupBindError();
+            }
+            broadcastingSocket = new DatagramSocket();
+         }
+
+         open = true;
+      }
+
+      public void openClient() throws Exception
+      {
+         // HORNETQ-874
+         if (checkForLinux() || checkForSolaris() || checkForHp())
+         {
+            try
+            {
+               receivingSocket = new MulticastSocket(new InetSocketAddress(groupAddress, groupPort));
+            }
+            catch (IOException e)
+            {
+               ActiveMQClientLogger.LOGGER.ioDiscoveryError(groupAddress.getHostAddress(), groupAddress instanceof Inet4Address ? "IPv4" : "IPv6");
+
+               receivingSocket = new MulticastSocket(groupPort);
+            }
+         }
+         else
+         {
+            receivingSocket = new MulticastSocket(groupPort);
+         }
+
+         if (localAddress != null)
+         {
+            receivingSocket.setInterface(localAddress);
+         }
+
+         receivingSocket.joinGroup(groupAddress);
+
+         receivingSocket.setSoTimeout(SOCKET_TIMEOUT);
+
+         open = true;
+      }
+
+      //@Todo: using isBroadcast to share endpoint between broadcast and receiving
+      public void close(boolean isBroadcast) throws Exception
+      {
+         open = false;
+
+         if (broadcastingSocket != null)
+         {
+            broadcastingSocket.close();
+         }
+
+         if (receivingSocket != null)
+         {
+            receivingSocket.close();
+         }
+      }
+
+      private static boolean checkForLinux()
+      {
+         return checkForPresence("os.name", "linux");
+      }
+
+      private static boolean checkForHp()
+      {
+         return checkForPresence("os.name", "hp");
+      }
+
+      private static boolean checkForSolaris()
+      {
+         return checkForPresence("os.name", "sun");
+      }
+
+      private static boolean checkForPresence(String key, String value)
+      {
+         try
+         {
+            String tmp = System.getProperty(key);
+            return tmp != null && tmp.trim().toLowerCase().startsWith(value);
+         }
+         catch (Throwable t)
+         {
+            return false;
+         }
+      }
+
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((groupAddress == null) ? 0 : groupAddress.hashCode());
+      result = prime * result + groupPort;
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+         return true;
+      if (obj == null)
+         return false;
+      if (getClass() != obj.getClass())
+         return false;
+      UDPBroadcastEndpointFactory other = (UDPBroadcastEndpointFactory) obj;
+      if (groupAddress == null)
+      {
+         if (other.groupAddress != null)
+            return false;
+      }
+      else if (!groupAddress.equals(other.groupAddress))
+         return false;
+      if (groupPort != other.groupPort)
+         return false;
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/UDPBroadcastGroupConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/UDPBroadcastGroupConfiguration.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/UDPBroadcastGroupConfiguration.java
deleted file mode 100644
index 5c84d79..0000000
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/UDPBroadcastGroupConfiguration.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.api.core;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.MulticastSocket;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.core.client.ActiveMQClientLogger;
-
-
-/**
- * The configuration used to determine how the server will broadcast members.
- * <p>
- * This is analogous to {@link org.apache.activemq.api.core.DiscoveryGroupConfiguration}
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a> Created 18 Nov 2008 08:44:30
- */
-public final class UDPBroadcastGroupConfiguration implements BroadcastEndpointFactoryConfiguration, DiscoveryGroupConfigurationCompatibilityHelper
-{
-   private static final long serialVersionUID = 1052413739064253955L;
-
-   private transient String localBindAddress = null;
-
-   private transient int localBindPort = -1;
-
-   private String groupAddress = null;
-
-   private int groupPort = -1;
-
-   public UDPBroadcastGroupConfiguration()
-   {
-   }
-
-   public BroadcastEndpointFactory createBroadcastEndpointFactory()
-   {
-      return new BroadcastEndpointFactory()
-      {
-         @Override
-         public BroadcastEndpoint createBroadcastEndpoint() throws Exception
-         {
-            return new UDPBroadcastEndpoint()
-               .setGroupAddress(groupAddress != null ? InetAddress.getByName(groupAddress) : null)
-               .setGroupPort(groupPort)
-               .setLocalBindAddress(localBindAddress != null ? InetAddress.getByName(localBindAddress) : null)
-               .setLocalBindPort(localBindPort);
-         }
-      };
-   }
-
-   public String getGroupAddress()
-   {
-      return groupAddress;
-   }
-
-   public UDPBroadcastGroupConfiguration setGroupAddress(String groupAddress)
-   {
-      this.groupAddress = groupAddress;
-      return this;
-   }
-
-   public int getGroupPort()
-   {
-      return groupPort;
-   }
-
-   public UDPBroadcastGroupConfiguration setGroupPort(int groupPort)
-   {
-      this.groupPort = groupPort;
-      return this;
-   }
-
-   public int getLocalBindPort()
-   {
-      return localBindPort;
-   }
-
-   public UDPBroadcastGroupConfiguration setLocalBindPort(int localBindPort)
-   {
-      this.localBindPort = localBindPort;
-      return this;
-   }
-
-   public String getLocalBindAddress()
-   {
-      return localBindAddress;
-   }
-
-   public UDPBroadcastGroupConfiguration setLocalBindAddress(String localBindAddress)
-   {
-      this.localBindAddress = localBindAddress;
-      return this;
-   }
-
-   /**
-    * <p> This is the member discovery implementation using direct UDP. It was extracted as a refactoring from
-    * {@link org.apache.activemq.core.cluster.DiscoveryGroup}</p>
-    *
-    * @author Tomohisa
-    * @author Howard Gao
-    * @author Clebert Suconic
-    */
-   private static class UDPBroadcastEndpoint implements BroadcastEndpoint
-   {
-      private static final int SOCKET_TIMEOUT = 500;
-
-      private InetAddress localAddress;
-
-      private int localBindPort;
-
-      private InetAddress groupAddress;
-
-      private int groupPort;
-
-      private DatagramSocket broadcastingSocket;
-
-      private MulticastSocket receivingSocket;
-
-      private volatile boolean open;
-
-      public UDPBroadcastEndpoint()
-      {
-      }
-
-      public UDPBroadcastEndpoint setGroupAddress(InetAddress groupAddress)
-      {
-         this.groupAddress = groupAddress;
-         return this;
-      }
-
-      public UDPBroadcastEndpoint setGroupPort(int groupPort)
-      {
-         this.groupPort = groupPort;
-         return this;
-      }
-
-      public UDPBroadcastEndpoint setLocalBindAddress(InetAddress localAddress)
-      {
-         this.localAddress = localAddress;
-         return this;
-      }
-
-      public UDPBroadcastEndpoint setLocalBindPort(int localBindPort)
-      {
-         this.localBindPort = localBindPort;
-         return this;
-      }
-
-
-      public void broadcast(byte[] data) throws Exception
-      {
-         DatagramPacket packet = new DatagramPacket(data, data.length, groupAddress, groupPort);
-         broadcastingSocket.send(packet);
-      }
-
-      public byte[] receiveBroadcast() throws Exception
-      {
-         final byte[] data = new byte[65535];
-         final DatagramPacket packet = new DatagramPacket(data, data.length);
-
-         while (open)
-         {
-            try
-            {
-               receivingSocket.receive(packet);
-            }
-            // TODO: Do we need this?
-            catch (InterruptedIOException e)
-            {
-               continue;
-            }
-            catch (IOException e)
-            {
-               if (open)
-               {
-                  ActiveMQClientLogger.LOGGER.warn(this + " getting exception when receiving broadcasting.", e);
-               }
-            }
-            break;
-         }
-         return data;
-      }
-
-      public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception
-      {
-         // We just use the regular method on UDP, there's no timeout support
-         // and this is basically for tests only
-         return receiveBroadcast();
-      }
-
-      public void openBroadcaster() throws Exception
-      {
-         if (localBindPort != -1)
-         {
-            broadcastingSocket = new DatagramSocket(localBindPort, localAddress);
-         }
-         else
-         {
-            if (localAddress != null)
-            {
-               ActiveMQClientLogger.LOGGER.broadcastGroupBindError();
-            }
-            broadcastingSocket = new DatagramSocket();
-         }
-
-         open = true;
-      }
-
-      public void openClient() throws Exception
-      {
-         // HORNETQ-874
-         if (checkForLinux() || checkForSolaris() || checkForHp())
-         {
-            try
-            {
-               receivingSocket = new MulticastSocket(new InetSocketAddress(groupAddress, groupPort));
-            }
-            catch (IOException e)
-            {
-               ActiveMQClientLogger.LOGGER.ioDiscoveryError(groupAddress.getHostAddress(), groupAddress instanceof Inet4Address ? "IPv4" : "IPv6");
-
-               receivingSocket = new MulticastSocket(groupPort);
-            }
-         }
-         else
-         {
-            receivingSocket = new MulticastSocket(groupPort);
-         }
-
-         if (localAddress != null)
-         {
-            receivingSocket.setInterface(localAddress);
-         }
-
-         receivingSocket.joinGroup(groupAddress);
-
-         receivingSocket.setSoTimeout(SOCKET_TIMEOUT);
-
-         open = true;
-      }
-
-      //@Todo: using isBroadcast to share endpoint between broadcast and receiving
-      public void close(boolean isBroadcast) throws Exception
-      {
-         open = false;
-
-         if (broadcastingSocket != null)
-         {
-            broadcastingSocket.close();
-         }
-
-         if (receivingSocket != null)
-         {
-            receivingSocket.close();
-         }
-      }
-
-      private static boolean checkForLinux()
-      {
-         return checkForPresence("os.name", "linux");
-      }
-
-      private static boolean checkForHp()
-      {
-         return checkForPresence("os.name", "hp");
-      }
-
-      private static boolean checkForSolaris()
-      {
-         return checkForPresence("os.name", "sun");
-      }
-
-      private static boolean checkForPresence(String key, String value)
-      {
-         try
-         {
-            String tmp = System.getProperty(key);
-            return tmp != null && tmp.trim().toLowerCase().startsWith(value);
-         }
-         catch (Throwable t)
-         {
-            return false;
-         }
-      }
-
-   }
-
-   @Override
-   public int hashCode()
-   {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + ((groupAddress == null) ? 0 : groupAddress.hashCode());
-      result = prime * result + groupPort;
-      return result;
-   }
-
-   @Override
-   public boolean equals(Object obj)
-   {
-      if (this == obj)
-         return true;
-      if (obj == null)
-         return false;
-      if (getClass() != obj.getClass())
-         return false;
-      UDPBroadcastGroupConfiguration other = (UDPBroadcastGroupConfiguration) obj;
-      if (groupAddress == null)
-      {
-         if (other.groupAddress != null)
-            return false;
-      }
-      else if (!groupAddress.equals(other.groupAddress))
-         return false;
-      if (groupPort != other.groupPort)
-         return false;
-      return true;
-   }
-}


Mime
View raw message