activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [32/52] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 Rename HornetQ* classes to ActiveMQ*
Date Tue, 18 Nov 2014 23:38:25 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/JMSManagementService.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/JMSManagementService.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/JMSManagementService.java
index 79c81d3..3f14f55 100644
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/JMSManagementService.java
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/JMSManagementService.java
@@ -14,9 +14,9 @@ package org.apache.activemq.jms.server.management;
 
 import org.apache.activemq.api.jms.management.JMSServerControl;
 import org.apache.activemq.core.server.Queue;
-import org.apache.activemq.jms.client.HornetQConnectionFactory;
-import org.apache.activemq.jms.client.HornetQQueue;
-import org.apache.activemq.jms.client.HornetQTopic;
+import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.jms.client.ActiveMQQueue;
+import org.apache.activemq.jms.client.ActiveMQTopic;
 import org.apache.activemq.jms.server.JMSServerManager;
 import org.apache.activemq.jms.server.config.ConnectionFactoryConfiguration;
 
@@ -31,15 +31,15 @@ public interface JMSManagementService
 
    void unregisterJMSServer() throws Exception;
 
-   void registerQueue(HornetQQueue queue, Queue serverQueue) throws Exception;
+   void registerQueue(ActiveMQQueue queue, Queue serverQueue) throws Exception;
 
    void unregisterQueue(String name) throws Exception;
 
-   void registerTopic(HornetQTopic topic) throws Exception;
+   void registerTopic(ActiveMQTopic topic) throws Exception;
 
    void unregisterTopic(String name) throws Exception;
 
-   void registerConnectionFactory(String name, ConnectionFactoryConfiguration config, HornetQConnectionFactory connectionFactory) throws Exception;
+   void registerConnectionFactory(String name, ConnectionFactoryConfiguration config, ActiveMQConnectionFactory connectionFactory) throws Exception;
 
    void unregisterConnectionFactory(String name) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/impl/JMSManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/impl/JMSManagementServiceImpl.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/impl/JMSManagementServiceImpl.java
index 151a20d..650f97e 100644
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/impl/JMSManagementServiceImpl.java
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/management/impl/JMSManagementServiceImpl.java
@@ -23,12 +23,12 @@ import org.apache.activemq.api.jms.management.JMSServerControl;
 import org.apache.activemq.api.jms.management.TopicControl;
 import org.apache.activemq.core.messagecounter.MessageCounter;
 import org.apache.activemq.core.messagecounter.MessageCounterManager;
-import org.apache.activemq.core.server.HornetQServer;
+import org.apache.activemq.core.server.ActiveMQServer;
 import org.apache.activemq.core.server.Queue;
 import org.apache.activemq.core.server.management.ManagementService;
-import org.apache.activemq.jms.client.HornetQConnectionFactory;
-import org.apache.activemq.jms.client.HornetQQueue;
-import org.apache.activemq.jms.client.HornetQTopic;
+import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.jms.client.ActiveMQQueue;
+import org.apache.activemq.jms.client.ActiveMQTopic;
 import org.apache.activemq.jms.management.impl.JMSConnectionFactoryControlImpl;
 import org.apache.activemq.jms.management.impl.JMSQueueControlImpl;
 import org.apache.activemq.jms.management.impl.JMSServerControlImpl;
@@ -54,7 +54,7 @@ public class JMSManagementServiceImpl implements JMSManagementService
 
    // Static --------------------------------------------------------
 
-   public JMSManagementServiceImpl(final ManagementService managementService, final HornetQServer server, final JMSServerManager jmsServerManager)
+   public JMSManagementServiceImpl(final ManagementService managementService, final ActiveMQServer server, final JMSServerManager jmsServerManager)
    {
       this.managementService = managementService;
       this.jmsServerManager = jmsServerManager;
@@ -80,7 +80,7 @@ public class JMSManagementServiceImpl implements JMSManagementService
       managementService.unregisterFromRegistry(ResourceNames.JMS_SERVER);
    }
 
-   public synchronized void registerQueue(final HornetQQueue queue, final Queue serverQueue) throws Exception
+   public synchronized void registerQueue(final ActiveMQQueue queue, final Queue serverQueue) throws Exception
    {
       QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue.getAddress());
       MessageCounterManager messageCounterManager = managementService.getMessageCounterManager();
@@ -104,7 +104,7 @@ public class JMSManagementServiceImpl implements JMSManagementService
       managementService.unregisterFromRegistry(ResourceNames.JMS_QUEUE + name);
    }
 
-   public synchronized void registerTopic(final HornetQTopic topic) throws Exception
+   public synchronized void registerTopic(final ActiveMQTopic topic) throws Exception
    {
       ObjectName objectName = managementService.getObjectNameBuilder().getJMSTopicObjectName(topic.getTopicName());
       AddressControl addressControl = (AddressControl)managementService.getResource(ResourceNames.CORE_ADDRESS + topic.getAddress());
@@ -122,7 +122,7 @@ public class JMSManagementServiceImpl implements JMSManagementService
 
    public synchronized void registerConnectionFactory(final String name,
                                                       final ConnectionFactoryConfiguration cfConfig,
-                                                      final HornetQConnectionFactory connectionFactory) throws Exception
+                                                      final ActiveMQConnectionFactory connectionFactory) throws Exception
    {
       ObjectName objectName = managementService.getObjectNameBuilder().getConnectionFactoryObjectName(name);
       JMSConnectionFactoryControlImpl control = new JMSConnectionFactoryControlImpl(cfConfig, connectionFactory, jmsServerManager, name);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRecoveryRegistry.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRecoveryRegistry.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRecoveryRegistry.java
new file mode 100644
index 0000000..797fe83
--- /dev/null
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRecoveryRegistry.java
@@ -0,0 +1,250 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.jms.server.recovery;
+
+import javax.transaction.xa.XAResource;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.api.core.Pair;
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.jms.server.ActiveMQJMSServerLogger;
+import org.jboss.tm.XAResourceRecovery;
+
+/**
+ * <p>This class is used by the Resource Adapter to register RecoveryDiscovery, which is based on the {@link XARecoveryConfig}</p>
+ * <p>Each outbound or inboud connection will pass the configuration here through by calling the method {@link ActiveMQRecoveryRegistry#register(XARecoveryConfig)}</p>
+ * <p>Later the {@link RecoveryDiscovery} will call {@link ActiveMQRecoveryRegistry#nodeUp(String, Pair, String, String)}
+ * so we will keep a track of nodes on the cluster
+ * or nodes where this server is connected to. </p>
+ *
+ * @author clebertsuconic
+ */
+public class ActiveMQRecoveryRegistry implements XAResourceRecovery
+{
+
+   private static final ActiveMQRecoveryRegistry theInstance = new ActiveMQRecoveryRegistry();
+
+   private final ConcurrentHashMap<XARecoveryConfig, RecoveryDiscovery> configSet = new ConcurrentHashMap<XARecoveryConfig, RecoveryDiscovery>();
+
+   /**
+    * The list by server id and resource adapter wrapper, what will actually be calling recovery.
+    * This will be returned by getXAResources
+    */
+   private final ConcurrentHashMap<String, ActiveMQXAResourceWrapper> recoveries = new ConcurrentHashMap<String, ActiveMQXAResourceWrapper>();
+
+   /**
+    * In case of failures, we retry on the next getXAResources
+    */
+   private final Set<RecoveryDiscovery> failedDiscoverySet = new HashSet<RecoveryDiscovery>();
+
+   private ActiveMQRecoveryRegistry()
+   {
+   }
+
+   /**
+    * This will be called periodically by the Transaction Manager
+    */
+   public XAResource[] getXAResources()
+   {
+      try
+      {
+         checkFailures();
+
+         ActiveMQXAResourceWrapper[] resourceArray = new ActiveMQXAResourceWrapper[recoveries.size()];
+         resourceArray = recoveries.values().toArray(resourceArray);
+
+         if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
+         {
+            ActiveMQJMSServerLogger.LOGGER.debug("\n=======================================================================================");
+            ActiveMQJMSServerLogger.LOGGER.debug("Returning the following list on getXAREsources:");
+            for (Map.Entry<String, ActiveMQXAResourceWrapper> entry : recoveries.entrySet())
+            {
+               ActiveMQJMSServerLogger.LOGGER.debug("server-id=" + entry.getKey() + ", value=" + entry.getValue());
+            }
+            ActiveMQJMSServerLogger.LOGGER.debug("=======================================================================================\n");
+         }
+
+         return resourceArray;
+      }
+      catch (Throwable e)
+      {
+         ActiveMQJMSServerLogger.LOGGER.warn(e.getMessage(), e);
+         return new XAResource[]{};
+      }
+   }
+
+   public static ActiveMQRecoveryRegistry getInstance()
+   {
+      return theInstance;
+   }
+
+   /**
+    * This will be called by then resource adapters, to register a new discovery
+    *
+    * @param resourceConfig
+    */
+   public void register(final XARecoveryConfig resourceConfig)
+   {
+      RecoveryDiscovery newInstance = new RecoveryDiscovery(resourceConfig);
+      RecoveryDiscovery discoveryRecord = configSet.putIfAbsent(resourceConfig, newInstance);
+      if (discoveryRecord == null)
+      {
+         discoveryRecord = newInstance;
+         discoveryRecord.start(false);
+      }
+      // you could have a configuration shared with multiple MDBs or RAs
+      discoveryRecord.incrementUsage();
+   }
+
+   /**
+    * Reference counts and deactivate a configuration
+    * Notice: this won't remove the servers since a server may have previous XIDs
+    *
+    * @param resourceConfig
+    */
+   public void unRegister(final XARecoveryConfig resourceConfig)
+   {
+      RecoveryDiscovery discoveryRecord = configSet.get(resourceConfig);
+      if (discoveryRecord != null && discoveryRecord.decrementUsage() == 0)
+      {
+         discoveryRecord = configSet.remove(resourceConfig);
+         if (discoveryRecord != null)
+         {
+            discoveryRecord.stop();
+         }
+      }
+   }
+
+   /**
+    * We need to make sure that all resources are closed, we don't actually do this when a resourceConfig is closed but
+    * maybe we should.
+    */
+   public void stop()
+   {
+      for (RecoveryDiscovery recoveryDiscovery : configSet.values())
+      {
+         recoveryDiscovery.stop();
+      }
+      for (ActiveMQXAResourceWrapper activeMQXAResourceWrapper : recoveries.values())
+      {
+         activeMQXAResourceWrapper.close();
+      }
+      recoveries.clear();
+      configSet.clear();
+   }
+
+   /**
+    * in case of a failure the Discovery will register itslef to retry
+    *
+    * @param failedDiscovery
+    */
+   public void failedDiscovery(RecoveryDiscovery failedDiscovery)
+   {
+      ActiveMQJMSServerLogger.LOGGER.debug("RecoveryDiscovery being set to restart:" + failedDiscovery);
+      synchronized (failedDiscoverySet)
+      {
+         failedDiscoverySet.add(failedDiscovery);
+      }
+   }
+
+   /**
+    * @param nodeID
+    * @param networkConfiguration
+    * @param username
+    * @param password
+    */
+   public void nodeUp(String nodeID,
+                      Pair<TransportConfiguration, TransportConfiguration> networkConfiguration,
+                      String username,
+                      String password)
+   {
+
+      if (recoveries.get(nodeID) == null)
+      {
+         if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
+         {
+            ActiveMQJMSServerLogger.LOGGER.debug(nodeID + " being registered towards " + networkConfiguration);
+         }
+         XARecoveryConfig config = new XARecoveryConfig(true,
+                                                        extractTransportConfiguration(networkConfiguration),
+                                                        username,
+                                                        password);
+
+         ActiveMQXAResourceWrapper wrapper = new ActiveMQXAResourceWrapper(config);
+         recoveries.putIfAbsent(nodeID, wrapper);
+      }
+   }
+
+   public void nodeDown(String nodeID)
+   {
+   }
+
+   /**
+    * this will go through the list of retries
+    */
+   private void checkFailures()
+   {
+      final HashSet<RecoveryDiscovery> failures = new HashSet<RecoveryDiscovery>();
+
+      // it will transfer all the discoveries to a new collection
+      synchronized (failedDiscoverySet)
+      {
+         failures.addAll(failedDiscoverySet);
+         failedDiscoverySet.clear();
+      }
+
+      if (failures.size() > 0)
+      {
+         // This shouldn't happen on a regular scenario, however when this retry happens this needs
+         // to be done on a new thread
+         Thread t = new Thread("ActiveMQ Recovery Discovery Reinitialization")
+         {
+            @Override
+            public void run()
+            {
+               for (RecoveryDiscovery discovery : failures)
+               {
+                  try
+                  {
+                     ActiveMQJMSServerLogger.LOGGER.debug("Retrying discovery " + discovery);
+                     discovery.start(true);
+                  }
+                  catch (Throwable e)
+                  {
+                     ActiveMQJMSServerLogger.LOGGER.warn(e.getMessage(), e);
+                  }
+               }
+            }
+         };
+
+         t.start();
+      }
+   }
+
+   /**
+    * @param networkConfiguration
+    * @return
+    */
+   private TransportConfiguration[] extractTransportConfiguration(Pair<TransportConfiguration, TransportConfiguration> networkConfiguration)
+   {
+      if (networkConfiguration.getB() != null)
+      {
+         return new TransportConfiguration[]{networkConfiguration.getA(), networkConfiguration.getB()};
+      }
+      return new TransportConfiguration[]{networkConfiguration.getA()};
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRegistryBase.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRegistryBase.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRegistryBase.java
new file mode 100644
index 0000000..d232831
--- /dev/null
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQRegistryBase.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.jms.server.recovery;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.jboss.tm.XAResourceRecoveryRegistry;
+
+/**
+ * This class is a base class for the integration layer where
+ * This class is used on integration points and this is just a bridge to the real registry at
+ * {@link ActiveMQRecoveryRegistry}
+ *
+ * @author Clebert
+ *
+ *
+ */
+public abstract class ActiveMQRegistryBase
+{
+
+   private final AtomicBoolean started = new AtomicBoolean(false);
+
+   public ActiveMQRegistryBase()
+   {
+   }
+
+
+   public abstract XAResourceRecoveryRegistry getTMRegistry();
+
+   public void register(final XARecoveryConfig resourceConfig)
+   {
+      init();
+      ActiveMQRecoveryRegistry.getInstance().register(resourceConfig);
+   }
+
+
+
+   public void unRegister(final XARecoveryConfig resourceConfig)
+   {
+      init();
+      ActiveMQRecoveryRegistry.getInstance().unRegister(resourceConfig);
+   }
+
+   public void stop()
+   {
+      if (started.compareAndSet(true, false) && getTMRegistry() != null)
+      {
+         getTMRegistry().removeXAResourceRecovery(ActiveMQRecoveryRegistry.getInstance());
+         ActiveMQRecoveryRegistry.getInstance().stop();
+      }
+   }
+
+   private void init()
+   {
+      if (started.compareAndSet(false, true) && getTMRegistry() != null)
+      {
+         getTMRegistry().addXAResourceRecovery(ActiveMQRecoveryRegistry.getInstance());
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceRecovery.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceRecovery.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceRecovery.java
new file mode 100644
index 0000000..1cd59f4
--- /dev/null
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceRecovery.java
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.jms.server.recovery;
+
+import javax.transaction.xa.XAResource;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.arjuna.ats.jta.recovery.XAResourceRecovery;
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.jms.server.ActiveMQJMSServerLogger;
+
+/**
+ * A XAResourceRecovery instance that can be used to recover any JMS provider.
+ * <p>
+ * In reality only recover, rollback and commit will be called but we still need to be implement all
+ * methods just in case.
+ * <p>
+ * To enable this add the following to the jbossts-properties file
+ * <pre>
+ * &lt;property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.ACTIVEMQ1"
+ *                 value="org.apache.activemq.jms.server.recovery.ActiveMQXAResourceRecovery;org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory"/&gt;
+ * </pre>
+ * <p>
+ * you'll need something like this if the ActiveMQ Server is remote
+ * <pre>
+ *      &lt;property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.ACTIVEMQ2"
+ *                  value="org.apache.activemq.jms.server.recovery.ActiveMQXAResourceRecovery;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445"/&gt;
+ * </pre>
+ * <p>
+ * you'll need something like this if the ActiveMQ Server is remote and has failover configured
+ * <pre>
+ *             &lt;property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.ACTIVEMQ2"
+ *                       value="org.apache.activemq.jms.server.recovery.ActiveMQXAResourceRecovery;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost2,port=5446"/&gt;
+ * </pre>
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ */
+public class ActiveMQXAResourceRecovery implements XAResourceRecovery
+{
+   private final boolean trace = ActiveMQJMSServerLogger.LOGGER.isTraceEnabled();
+
+   private boolean hasMore;
+
+   private ActiveMQXAResourceWrapper res;
+
+   public ActiveMQXAResourceRecovery()
+   {
+      if (trace)
+      {
+         ActiveMQJMSServerLogger.LOGGER.trace("Constructing ActiveMQXAResourceRecovery");
+      }
+   }
+
+   public boolean initialise(final String config)
+   {
+      if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
+      {
+         ActiveMQJMSServerLogger.LOGGER.trace(this + " intialise: " + config);
+      }
+
+      String[] configs = config.split(";");
+      XARecoveryConfig[] xaRecoveryConfigs = new XARecoveryConfig[configs.length];
+      for (int i = 0, configsLength = configs.length; i < configsLength; i++)
+      {
+         String s = configs[i];
+         ConfigParser parser = new ConfigParser(s);
+         String connectorFactoryClassName = parser.getConnectorFactoryClassName();
+         Map<String, Object> connectorParams = parser.getConnectorParameters();
+         String username = parser.getUsername();
+         String password = parser.getPassword();
+         TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams);
+         xaRecoveryConfigs[i] = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfiguration}, username, password);
+      }
+
+
+      res = new ActiveMQXAResourceWrapper(xaRecoveryConfigs);
+
+      if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
+      {
+         ActiveMQJMSServerLogger.LOGGER.trace(this + " initialised");
+      }
+
+      return true;
+   }
+
+   public boolean hasMoreResources()
+   {
+      if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
+      {
+         ActiveMQJMSServerLogger.LOGGER.trace(this + " hasMoreResources");
+      }
+
+      /*
+       * The way hasMoreResources is supposed to work is as follows:
+       * For each "sweep" the recovery manager will call hasMoreResources, then if it returns
+       * true it will call getXAResource.
+       * It will repeat that until hasMoreResources returns false.
+       * Then the sweep is over.
+       * For the next sweep hasMoreResources should return true, etc.
+       *
+       * In our case where we only need to return one XAResource per sweep,
+       * hasMoreResources should basically alternate between true and false.
+       *
+       *
+       */
+
+      hasMore = !hasMore;
+
+      return hasMore;
+   }
+
+   public XAResource getXAResource()
+   {
+      if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
+      {
+         ActiveMQJMSServerLogger.LOGGER.trace(this + " getXAResource");
+      }
+
+      return res;
+   }
+
+   public XAResource[] getXAResources()
+   {
+      return new XAResource[]{res};
+   }
+
+   @Override
+   protected void finalize()
+   {
+      res.close();
+   }
+
+   public static class ConfigParser
+   {
+      private final String connectorFactoryClassName;
+
+      private final Map<String, Object> connectorParameters;
+
+      private String username;
+
+      private String password;
+
+      public ConfigParser(final String config)
+      {
+         if (config == null || config.length() == 0)
+         {
+            throw new IllegalArgumentException("Must specify provider connector factory class name in config");
+         }
+
+         String[] strings = config.split(",");
+
+         // First (mandatory) param is the connector factory class name
+         if (strings.length < 1)
+         {
+            throw new IllegalArgumentException("Must specify provider connector factory class name in config");
+         }
+
+         connectorFactoryClassName = strings[0].trim();
+
+         // Next two (optional) parameters are the username and password to use for creating the session for recovery
+
+         if (strings.length >= 2)
+         {
+
+            username = strings[1].trim();
+            if (username.length() == 0)
+            {
+               username = null;
+            }
+
+            if (strings.length == 2)
+            {
+               throw new IllegalArgumentException("If username is specified, password must be specified too");
+            }
+
+            password = strings[2].trim();
+            if (password.length() == 0)
+            {
+               password = null;
+            }
+         }
+
+         // other tokens are for connector configurations
+         connectorParameters = new HashMap<String, Object>();
+         if (strings.length >= 3)
+         {
+            for (int i = 3; i < strings.length; i++)
+            {
+               String[] str = strings[i].split("=");
+               if (str.length == 2)
+               {
+                  connectorParameters.put(str[0].trim(), str[1].trim());
+               }
+            }
+         }
+      }
+
+      public String getConnectorFactoryClassName()
+      {
+         return connectorFactoryClassName;
+      }
+
+      public Map<String, Object> getConnectorParameters()
+      {
+         return connectorParameters;
+      }
+
+      public String getUsername()
+      {
+         return username;
+      }
+
+      public String getPassword()
+      {
+         return password;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceWrapper.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceWrapper.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceWrapper.java
new file mode 100644
index 0000000..04a7457
--- /dev/null
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/ActiveMQXAResourceWrapper.java
@@ -0,0 +1,531 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.jms.server.recovery;
+
+import java.util.Arrays;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.api.core.ActiveMQException;
+import org.apache.activemq.api.core.ActiveMQExceptionType;
+import org.apache.activemq.api.core.ActiveMQNotConnectedException;
+import org.apache.activemq.api.core.client.ClientSession;
+import org.apache.activemq.api.core.client.ClientSessionFactory;
+import org.apache.activemq.api.core.client.ActiveMQClient;
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.api.core.client.SessionFailureListener;
+import org.apache.activemq.jms.server.ActiveMQJMSServerLogger;
+
+/**
+ * XAResourceWrapper.
+ *
+ * Mainly from org.jboss.server.XAResourceWrapper from the JBoss AS server module
+ *
+ * The reason why we don't use that class directly is that it assumes on failure of connection
+ * the RM_FAIL or RM_ERR is thrown, but in ActiveMQ we throw XA_RETRY since we want the recovery manager to be able
+ * to retry on failure without having to manually retry
+ *
+ * @author <a href="adrian@jboss.com">Adrian Brock</a>
+ * @author <a href="tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *
+ * @version $Revision: 45341 $
+ */
+public class ActiveMQXAResourceWrapper implements XAResource, SessionFailureListener
+{
+   /** The state lock */
+   private static final Object lock = new Object();
+
+   private ServerLocator serverLocator;
+
+   private ClientSessionFactory csf;
+
+   private ClientSession delegate;
+
+   private XARecoveryConfig[] xaRecoveryConfigs;
+
+   public ActiveMQXAResourceWrapper(XARecoveryConfig... xaRecoveryConfigs)
+   {
+      this.xaRecoveryConfigs = xaRecoveryConfigs;
+
+      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQJMSServerLogger.LOGGER.debug("Recovery configured with " + Arrays.toString(xaRecoveryConfigs) +
+            ", instance=" +
+            System.identityHashCode(this));
+      }
+   }
+
+   public Xid[] recover(final int flag) throws XAException
+   {
+      XAResource xaResource = getDelegate(false);
+
+      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQJMSServerLogger.LOGGER.debug("looking for recover at " + xaResource + " configuration " + Arrays.toString(this.xaRecoveryConfigs));
+      }
+
+      try
+      {
+         Xid[] xids = xaResource.recover(flag);
+
+         if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled() && xids != null && xids.length > 0)
+         {
+            ActiveMQJMSServerLogger.LOGGER.debug("Recovering these following IDs " + Arrays.toString(xids) + " at " + this);
+         }
+
+         return xids;
+      }
+      catch (XAException e)
+      {
+         ActiveMQJMSServerLogger.LOGGER.xaRecoverError(e);
+         throw check(e);
+      }
+   }
+
+   public void commit(final Xid xid, final boolean onePhase) throws XAException
+   {
+      XAResource xaResource = getDelegate(true);
+      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQJMSServerLogger.LOGGER.debug("Commit " + xaResource + " xid " + " onePhase=" + onePhase);
+      }
+      try
+      {
+         xaResource.commit(xid, onePhase);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void rollback(final Xid xid) throws XAException
+   {
+      XAResource xaResource = getDelegate(true);
+      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQJMSServerLogger.LOGGER.debug("Rollback " + xaResource + " xid ");
+      }
+      try
+      {
+         xaResource.rollback(xid);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void forget(final Xid xid) throws XAException
+   {
+      XAResource xaResource = getDelegate(false);
+      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQJMSServerLogger.LOGGER.debug("Forget " + xaResource + " xid ");
+      }
+
+      try
+      {
+         xaResource.forget(xid);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public boolean isSameRM(XAResource xaRes) throws XAException
+   {
+      if (xaRes instanceof ActiveMQXAResourceWrapper)
+      {
+         xaRes = ((ActiveMQXAResourceWrapper)xaRes).getDelegate(false);
+      }
+
+      XAResource xaResource = getDelegate(false);
+      try
+      {
+         return xaResource.isSameRM(xaRes);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public int prepare(final Xid xid) throws XAException
+   {
+      XAResource xaResource = getDelegate(true);
+      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQJMSServerLogger.LOGGER.debug("prepare " + xaResource + " xid ");
+      }
+      try
+      {
+         return xaResource.prepare(xid);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void start(final Xid xid, final int flags) throws XAException
+   {
+      XAResource xaResource = getDelegate(false);
+      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQJMSServerLogger.LOGGER.debug("start " + xaResource + " xid ");
+      }
+      try
+      {
+         xaResource.start(xid, flags);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void end(final Xid xid, final int flags) throws XAException
+   {
+      XAResource xaResource = getDelegate(false);
+      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQJMSServerLogger.LOGGER.debug("end " + xaResource + " xid ");
+      }
+      try
+      {
+         xaResource.end(xid, flags);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public int getTransactionTimeout() throws XAException
+   {
+      XAResource xaResource = getDelegate(false);
+      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQJMSServerLogger.LOGGER.debug("getTransactionTimeout " + xaResource + " xid ");
+      }
+      try
+      {
+         return xaResource.getTransactionTimeout();
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public boolean setTransactionTimeout(final int seconds) throws XAException
+   {
+      XAResource xaResource = getDelegate(false);
+      if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         ActiveMQJMSServerLogger.LOGGER.debug("setTransactionTimeout " + xaResource + " xid ");
+      }
+      try
+      {
+         return xaResource.setTransactionTimeout(seconds);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void connectionFailed(final ActiveMQException me, boolean failedOver)
+   {
+      if (me.getType() == ActiveMQExceptionType.DISCONNECTED)
+      {
+         if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
+         {
+            ActiveMQJMSServerLogger.LOGGER.debug("being disconnected for server shutdown", me);
+         }
+      }
+      else
+      {
+         ActiveMQJMSServerLogger.LOGGER.xaRecoverConnectionError(me, csf);
+      }
+      close();
+   }
+
+   @Override
+   public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID)
+   {
+      connectionFailed(me, failedOver);
+   }
+
+   public void beforeReconnect(final ActiveMQException me)
+   {
+   }
+
+   /**
+    * Get the connectionFactory XAResource
+    *
+    * @return the connectionFactory
+    * @throws XAException for any problem
+    */
+   private XAResource getDelegate(boolean retry) throws XAException
+   {
+      XAResource result = null;
+      Exception error = null;
+      try
+      {
+         result = connect();
+      }
+      catch (Exception e)
+      {
+         error = e;
+      }
+
+      if (result == null)
+      {
+         // we should always throw a retry for certain methods comit etc, if not the tx is marked as a heuristic and
+         // all chaos is let loose
+         if (retry)
+         {
+            XAException xae = new XAException("Connection unavailable for xa recovery");
+            xae.errorCode = XAException.XA_RETRY;
+            if (error != null)
+            {
+               xae.initCause(error);
+            }
+            if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
+            {
+               ActiveMQJMSServerLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae);
+            }
+            throw xae;
+         }
+         else
+         {
+            XAException xae = new XAException("Error trying to connect to any providers for xa recovery");
+            xae.errorCode = XAException.XAER_RMERR;
+            if (error != null)
+            {
+               xae.initCause(error);
+            }
+            if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
+            {
+               ActiveMQJMSServerLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae);
+            }
+            throw xae;
+         }
+
+      }
+
+      return result;
+   }
+
+   /**
+    * Connect to the server if not already done so
+    *
+    * @return the connectionFactory XAResource
+    * @throws Exception for any problem
+    */
+   protected XAResource connect() throws Exception
+   {
+      // Do we already have a valid connectionFactory?
+      synchronized (ActiveMQXAResourceWrapper.lock)
+      {
+         if (delegate != null)
+         {
+            return delegate;
+         }
+      }
+
+      for (XARecoveryConfig xaRecoveryConfig : xaRecoveryConfigs)
+      {
+
+         if (xaRecoveryConfig == null)
+         {
+            continue;
+         }
+         if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
+         {
+            ActiveMQJMSServerLogger.LOGGER.debug("Trying to connect recovery on " + xaRecoveryConfig + " of " + Arrays.toString(xaRecoveryConfigs));
+         }
+
+         ClientSession cs = null;
+
+         try
+         {
+            // setting ha=false because otherwise the connector would go towards any server, causing Heuristic exceptions
+            // we really need to control what server it's connected to
+
+            // Manual configuration may still use discovery, so we will keep this
+            if (xaRecoveryConfig.getDiscoveryConfiguration() != null)
+            {
+               serverLocator = ActiveMQClient.createServerLocator(false, xaRecoveryConfig.getDiscoveryConfiguration());
+            }
+            else
+            {
+               serverLocator = ActiveMQClient.createServerLocator(false, xaRecoveryConfig.getTransportConfig());
+            }
+            serverLocator.disableFinalizeCheck();
+            csf = serverLocator.createSessionFactory();
+            if (xaRecoveryConfig.getUsername() == null)
+            {
+               cs = csf.createSession(true, false, false);
+            }
+            else
+            {
+               cs = csf.createSession(xaRecoveryConfig.getUsername(),
+                  xaRecoveryConfig.getPassword(),
+                  true,
+                  false,
+                  false,
+                  false,
+                  1);
+            }
+         }
+         catch (Throwable e)
+         {
+            ActiveMQJMSServerLogger.LOGGER.xaRecoverAutoConnectionError(e, xaRecoveryConfig);
+            if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled())
+            {
+               ActiveMQJMSServerLogger.LOGGER.debug(e.getMessage(), e);
+            }
+
+            try
+            {
+               if (cs != null) cs.close();
+               if (serverLocator != null) serverLocator.close();
+            }
+            catch (Throwable ignored)
+            {
+               if (ActiveMQJMSServerLogger.LOGGER.isTraceEnabled())
+               {
+                  ActiveMQJMSServerLogger.LOGGER.trace(e.getMessage(), ignored);
+               }
+            }
+            continue;
+         }
+
+         cs.addFailureListener(this);
+
+         synchronized (ActiveMQXAResourceWrapper.lock)
+         {
+            delegate = cs;
+         }
+
+         return delegate;
+      }
+      ActiveMQJMSServerLogger.LOGGER.recoveryConnectFailed(Arrays.toString(xaRecoveryConfigs));
+      throw new ActiveMQNotConnectedException();
+   }
+
+   /* (non-Javadoc)
+    * @see java.lang.Object#toString()
+    */
+   @Override
+   public String toString()
+   {
+      return "ActiveMQXAResourceWrapper [serverLocator=" + serverLocator +
+         ", csf=" +
+         csf +
+         ", delegate=" +
+         delegate +
+         ", xaRecoveryConfigs=" +
+         Arrays.toString(xaRecoveryConfigs) +
+         ", instance=" +
+         System.identityHashCode(this) +
+         "]";
+   }
+
+   /**
+    * Close the connection
+    */
+   public void close()
+   {
+      ServerLocator oldServerLocator = null;
+      ClientSessionFactory oldCSF = null;
+      ClientSession oldDelegate = null;
+      synchronized (ActiveMQXAResourceWrapper.lock)
+      {
+         oldCSF = csf;
+         csf = null;
+         oldDelegate = delegate;
+         delegate = null;
+         oldServerLocator = serverLocator;
+         serverLocator = null;
+      }
+
+      if (oldDelegate != null)
+      {
+         try
+         {
+            oldDelegate.close();
+         }
+         catch (Throwable ignorable)
+         {
+            ActiveMQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
+         }
+      }
+
+      if (oldCSF != null)
+      {
+         try
+         {
+            oldCSF.close();
+         }
+         catch (Throwable ignorable)
+         {
+            ActiveMQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
+         }
+      }
+
+      if (oldServerLocator != null)
+      {
+         try
+         {
+            oldServerLocator.close();
+         }
+         catch (Throwable ignorable)
+         {
+            ActiveMQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
+         }
+      }
+   }
+
+   /**
+    * Check whether an XAException is fatal. If it is an RM problem
+    * we close the connection so the next call will reconnect.
+    *
+    * @param e the xa exception
+    * @return never
+    * @throws XAException always
+    */
+   protected XAException check(final XAException e) throws XAException
+   {
+      ActiveMQJMSServerLogger.LOGGER.xaRecoveryError(e);
+
+
+      // If any exception happened, we close the connection so we may start fresh
+      close();
+      throw e;
+   }
+
+   @Override
+   protected void finalize() throws Throwable
+   {
+      close();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQRecoveryRegistry.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQRecoveryRegistry.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQRecoveryRegistry.java
deleted file mode 100644
index d9b5c86..0000000
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQRecoveryRegistry.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.apache.activemq.jms.server.recovery;
-
-import javax.transaction.xa.XAResource;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.activemq.api.core.Pair;
-import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.jms.server.HornetQJMSServerLogger;
-import org.jboss.tm.XAResourceRecovery;
-
-/**
- * <p>This class is used by the Resource Adapter to register RecoveryDiscovery, which is based on the {@link XARecoveryConfig}</p>
- * <p>Each outbound or inboud connection will pass the configuration here through by calling the method {@link HornetQRecoveryRegistry#register(XARecoveryConfig)}</p>
- * <p>Later the {@link RecoveryDiscovery} will call {@link HornetQRecoveryRegistry#nodeUp(String, Pair, String, String)}
- * so we will keep a track of nodes on the cluster
- * or nodes where this server is connected to. </p>
- *
- * @author clebertsuconic
- */
-public class HornetQRecoveryRegistry implements XAResourceRecovery
-{
-
-   private static final HornetQRecoveryRegistry theInstance = new HornetQRecoveryRegistry();
-
-   private final ConcurrentHashMap<XARecoveryConfig, RecoveryDiscovery> configSet = new ConcurrentHashMap<XARecoveryConfig, RecoveryDiscovery>();
-
-   /**
-    * The list by server id and resource adapter wrapper, what will actually be calling recovery.
-    * This will be returned by getXAResources
-    */
-   private final ConcurrentHashMap<String, HornetQXAResourceWrapper> recoveries = new ConcurrentHashMap<String, HornetQXAResourceWrapper>();
-
-   /**
-    * In case of failures, we retry on the next getXAResources
-    */
-   private final Set<RecoveryDiscovery> failedDiscoverySet = new HashSet<RecoveryDiscovery>();
-
-   private HornetQRecoveryRegistry()
-   {
-   }
-
-   /**
-    * This will be called periodically by the Transaction Manager
-    */
-   public XAResource[] getXAResources()
-   {
-      try
-      {
-         checkFailures();
-
-         HornetQXAResourceWrapper[] resourceArray = new HornetQXAResourceWrapper[recoveries.size()];
-         resourceArray = recoveries.values().toArray(resourceArray);
-
-         if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
-         {
-            HornetQJMSServerLogger.LOGGER.debug("\n=======================================================================================");
-            HornetQJMSServerLogger.LOGGER.debug("Returning the following list on getXAREsources:");
-            for (Map.Entry<String, HornetQXAResourceWrapper> entry : recoveries.entrySet())
-            {
-               HornetQJMSServerLogger.LOGGER.debug("server-id=" + entry.getKey() + ", value=" + entry.getValue());
-            }
-            HornetQJMSServerLogger.LOGGER.debug("=======================================================================================\n");
-         }
-
-         return resourceArray;
-      }
-      catch (Throwable e)
-      {
-         HornetQJMSServerLogger.LOGGER.warn(e.getMessage(), e);
-         return new XAResource[]{};
-      }
-   }
-
-   public static HornetQRecoveryRegistry getInstance()
-   {
-      return theInstance;
-   }
-
-   /**
-    * This will be called by then resource adapters, to register a new discovery
-    *
-    * @param resourceConfig
-    */
-   public void register(final XARecoveryConfig resourceConfig)
-   {
-      RecoveryDiscovery newInstance = new RecoveryDiscovery(resourceConfig);
-      RecoveryDiscovery discoveryRecord = configSet.putIfAbsent(resourceConfig, newInstance);
-      if (discoveryRecord == null)
-      {
-         discoveryRecord = newInstance;
-         discoveryRecord.start(false);
-      }
-      // you could have a configuration shared with multiple MDBs or RAs
-      discoveryRecord.incrementUsage();
-   }
-
-   /**
-    * Reference counts and deactivate a configuration
-    * Notice: this won't remove the servers since a server may have previous XIDs
-    *
-    * @param resourceConfig
-    */
-   public void unRegister(final XARecoveryConfig resourceConfig)
-   {
-      RecoveryDiscovery discoveryRecord = configSet.get(resourceConfig);
-      if (discoveryRecord != null && discoveryRecord.decrementUsage() == 0)
-      {
-         discoveryRecord = configSet.remove(resourceConfig);
-         if (discoveryRecord != null)
-         {
-            discoveryRecord.stop();
-         }
-      }
-   }
-
-   /**
-    * We need to make sure that all resources are closed, we don't actually do this when a resourceConfig is closed but
-    * maybe we should.
-    */
-   public void stop()
-   {
-      for (RecoveryDiscovery recoveryDiscovery : configSet.values())
-      {
-         recoveryDiscovery.stop();
-      }
-      for (HornetQXAResourceWrapper hornetQXAResourceWrapper : recoveries.values())
-      {
-         hornetQXAResourceWrapper.close();
-      }
-      recoveries.clear();
-      configSet.clear();
-   }
-
-   /**
-    * in case of a failure the Discovery will register itslef to retry
-    *
-    * @param failedDiscovery
-    */
-   public void failedDiscovery(RecoveryDiscovery failedDiscovery)
-   {
-      HornetQJMSServerLogger.LOGGER.debug("RecoveryDiscovery being set to restart:" + failedDiscovery);
-      synchronized (failedDiscoverySet)
-      {
-         failedDiscoverySet.add(failedDiscovery);
-      }
-   }
-
-   /**
-    * @param nodeID
-    * @param networkConfiguration
-    * @param username
-    * @param password
-    */
-   public void nodeUp(String nodeID,
-                      Pair<TransportConfiguration, TransportConfiguration> networkConfiguration,
-                      String username,
-                      String password)
-   {
-
-      if (recoveries.get(nodeID) == null)
-      {
-         if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
-         {
-            HornetQJMSServerLogger.LOGGER.debug(nodeID + " being registered towards " + networkConfiguration);
-         }
-         XARecoveryConfig config = new XARecoveryConfig(true,
-                                                        extractTransportConfiguration(networkConfiguration),
-                                                        username,
-                                                        password);
-
-         HornetQXAResourceWrapper wrapper = new HornetQXAResourceWrapper(config);
-         recoveries.putIfAbsent(nodeID, wrapper);
-      }
-   }
-
-   public void nodeDown(String nodeID)
-   {
-   }
-
-   /**
-    * this will go through the list of retries
-    */
-   private void checkFailures()
-   {
-      final HashSet<RecoveryDiscovery> failures = new HashSet<RecoveryDiscovery>();
-
-      // it will transfer all the discoveries to a new collection
-      synchronized (failedDiscoverySet)
-      {
-         failures.addAll(failedDiscoverySet);
-         failedDiscoverySet.clear();
-      }
-
-      if (failures.size() > 0)
-      {
-         // This shouldn't happen on a regular scenario, however when this retry happens this needs
-         // to be done on a new thread
-         Thread t = new Thread("HornetQ Recovery Discovery Reinitialization")
-         {
-            @Override
-            public void run()
-            {
-               for (RecoveryDiscovery discovery : failures)
-               {
-                  try
-                  {
-                     HornetQJMSServerLogger.LOGGER.debug("Retrying discovery " + discovery);
-                     discovery.start(true);
-                  }
-                  catch (Throwable e)
-                  {
-                     HornetQJMSServerLogger.LOGGER.warn(e.getMessage(), e);
-                  }
-               }
-            }
-         };
-
-         t.start();
-      }
-   }
-
-   /**
-    * @param networkConfiguration
-    * @return
-    */
-   private TransportConfiguration[] extractTransportConfiguration(Pair<TransportConfiguration, TransportConfiguration> networkConfiguration)
-   {
-      if (networkConfiguration.getB() != null)
-      {
-         return new TransportConfiguration[]{networkConfiguration.getA(), networkConfiguration.getB()};
-      }
-      return new TransportConfiguration[]{networkConfiguration.getA()};
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQRegistryBase.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQRegistryBase.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQRegistryBase.java
deleted file mode 100644
index c9381bb..0000000
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQRegistryBase.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.apache.activemq.jms.server.recovery;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.jboss.tm.XAResourceRecoveryRegistry;
-
-/**
- * This class is a base class for the integration layer where
- * This class is used on integration points and this is just a bridge to the real registry at
- * {@link HornetQRecoveryRegistry}
- *
- * @author Clebert
- *
- *
- */
-public abstract class HornetQRegistryBase
-{
-
-   private final AtomicBoolean started = new AtomicBoolean(false);
-
-   public HornetQRegistryBase()
-   {
-   }
-
-
-   public abstract XAResourceRecoveryRegistry getTMRegistry();
-
-   public void register(final XARecoveryConfig resourceConfig)
-   {
-      init();
-      HornetQRecoveryRegistry.getInstance().register(resourceConfig);
-   }
-
-
-
-   public void unRegister(final XARecoveryConfig resourceConfig)
-   {
-      init();
-      HornetQRecoveryRegistry.getInstance().unRegister(resourceConfig);
-   }
-
-   public void stop()
-   {
-      if (started.compareAndSet(true, false) && getTMRegistry() != null)
-      {
-         getTMRegistry().removeXAResourceRecovery(HornetQRecoveryRegistry.getInstance());
-         HornetQRecoveryRegistry.getInstance().stop();
-      }
-   }
-
-   private void init()
-   {
-      if (started.compareAndSet(false, true) && getTMRegistry() != null)
-      {
-         getTMRegistry().addXAResourceRecovery(HornetQRecoveryRegistry.getInstance());
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQXAResourceRecovery.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQXAResourceRecovery.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQXAResourceRecovery.java
deleted file mode 100644
index 448e6b1..0000000
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQXAResourceRecovery.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.apache.activemq.jms.server.recovery;
-
-import javax.transaction.xa.XAResource;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.arjuna.ats.jta.recovery.XAResourceRecovery;
-import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.jms.server.HornetQJMSServerLogger;
-
-/**
- * A XAResourceRecovery instance that can be used to recover any JMS provider.
- * <p>
- * In reality only recover, rollback and commit will be called but we still need to be implement all
- * methods just in case.
- * <p>
- * To enable this add the following to the jbossts-properties file
- * <pre>
- * &lt;property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1"
- *                 value="org.apache.activemq.jms.server.recovery.HornetQXAResourceRecovery;org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory"/&gt;
- * </pre>
- * <p>
- * you'll need something like this if the HornetQ Server is remote
- * <pre>
- *      &lt;property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2"
- *                  value="org.apache.activemq.jms.server.recovery.HornetQXAResourceRecovery;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445"/&gt;
- * </pre>
- * <p>
- * you'll need something like this if the HornetQ Server is remote and has failover configured
- * <pre>
- *             &lt;property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2"
- *                       value="org.apache.activemq.jms.server.recovery.HornetQXAResourceRecovery;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445;org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost2,port=5446"/&gt;
- * </pre>
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * @version <tt>$Revision: 1.1 $</tt>
- */
-public class HornetQXAResourceRecovery implements XAResourceRecovery
-{
-   private final boolean trace = HornetQJMSServerLogger.LOGGER.isTraceEnabled();
-
-   private boolean hasMore;
-
-   private HornetQXAResourceWrapper res;
-
-   public HornetQXAResourceRecovery()
-   {
-      if (trace)
-      {
-         HornetQJMSServerLogger.LOGGER.trace("Constructing HornetQXAResourceRecovery");
-      }
-   }
-
-   public boolean initialise(final String config)
-   {
-      if (HornetQJMSServerLogger.LOGGER.isTraceEnabled())
-      {
-         HornetQJMSServerLogger.LOGGER.trace(this + " intialise: " + config);
-      }
-
-      String[] configs = config.split(";");
-      XARecoveryConfig[] xaRecoveryConfigs = new XARecoveryConfig[configs.length];
-      for (int i = 0, configsLength = configs.length; i < configsLength; i++)
-      {
-         String s = configs[i];
-         ConfigParser parser = new ConfigParser(s);
-         String connectorFactoryClassName = parser.getConnectorFactoryClassName();
-         Map<String, Object> connectorParams = parser.getConnectorParameters();
-         String username = parser.getUsername();
-         String password = parser.getPassword();
-         TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams);
-         xaRecoveryConfigs[i] = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfiguration}, username, password);
-      }
-
-
-      res = new HornetQXAResourceWrapper(xaRecoveryConfigs);
-
-      if (HornetQJMSServerLogger.LOGGER.isTraceEnabled())
-      {
-         HornetQJMSServerLogger.LOGGER.trace(this + " initialised");
-      }
-
-      return true;
-   }
-
-   public boolean hasMoreResources()
-   {
-      if (HornetQJMSServerLogger.LOGGER.isTraceEnabled())
-      {
-         HornetQJMSServerLogger.LOGGER.trace(this + " hasMoreResources");
-      }
-
-      /*
-       * The way hasMoreResources is supposed to work is as follows:
-       * For each "sweep" the recovery manager will call hasMoreResources, then if it returns
-       * true it will call getXAResource.
-       * It will repeat that until hasMoreResources returns false.
-       * Then the sweep is over.
-       * For the next sweep hasMoreResources should return true, etc.
-       *
-       * In our case where we only need to return one XAResource per sweep,
-       * hasMoreResources should basically alternate between true and false.
-       *
-       *
-       */
-
-      hasMore = !hasMore;
-
-      return hasMore;
-   }
-
-   public XAResource getXAResource()
-   {
-      if (HornetQJMSServerLogger.LOGGER.isTraceEnabled())
-      {
-         HornetQJMSServerLogger.LOGGER.trace(this + " getXAResource");
-      }
-
-      return res;
-   }
-
-   public XAResource[] getXAResources()
-   {
-      return new XAResource[]{res};
-   }
-
-   @Override
-   protected void finalize()
-   {
-      res.close();
-   }
-
-   public static class ConfigParser
-   {
-      private final String connectorFactoryClassName;
-
-      private final Map<String, Object> connectorParameters;
-
-      private String username;
-
-      private String password;
-
-      public ConfigParser(final String config)
-      {
-         if (config == null || config.length() == 0)
-         {
-            throw new IllegalArgumentException("Must specify provider connector factory class name in config");
-         }
-
-         String[] strings = config.split(",");
-
-         // First (mandatory) param is the connector factory class name
-         if (strings.length < 1)
-         {
-            throw new IllegalArgumentException("Must specify provider connector factory class name in config");
-         }
-
-         connectorFactoryClassName = strings[0].trim();
-
-         // Next two (optional) parameters are the username and password to use for creating the session for recovery
-
-         if (strings.length >= 2)
-         {
-
-            username = strings[1].trim();
-            if (username.length() == 0)
-            {
-               username = null;
-            }
-
-            if (strings.length == 2)
-            {
-               throw new IllegalArgumentException("If username is specified, password must be specified too");
-            }
-
-            password = strings[2].trim();
-            if (password.length() == 0)
-            {
-               password = null;
-            }
-         }
-
-         // other tokens are for connector configurations
-         connectorParameters = new HashMap<String, Object>();
-         if (strings.length >= 3)
-         {
-            for (int i = 3; i < strings.length; i++)
-            {
-               String[] str = strings[i].split("=");
-               if (str.length == 2)
-               {
-                  connectorParameters.put(str[0].trim(), str[1].trim());
-               }
-            }
-         }
-      }
-
-      public String getConnectorFactoryClassName()
-      {
-         return connectorFactoryClassName;
-      }
-
-      public Map<String, Object> getConnectorParameters()
-      {
-         return connectorParameters;
-      }
-
-      public String getUsername()
-      {
-         return username;
-      }
-
-      public String getPassword()
-      {
-         return password;
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQXAResourceWrapper.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQXAResourceWrapper.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQXAResourceWrapper.java
deleted file mode 100644
index 293137b..0000000
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/HornetQXAResourceWrapper.java
+++ /dev/null
@@ -1,531 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.apache.activemq.jms.server.recovery;
-
-import java.util.Arrays;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.apache.activemq.api.core.ActiveMQException;
-import org.apache.activemq.api.core.ActiveMQExceptionType;
-import org.apache.activemq.api.core.ActiveMQNotConnectedException;
-import org.apache.activemq.api.core.client.ClientSession;
-import org.apache.activemq.api.core.client.ClientSessionFactory;
-import org.apache.activemq.api.core.client.HornetQClient;
-import org.apache.activemq.api.core.client.ServerLocator;
-import org.apache.activemq.api.core.client.SessionFailureListener;
-import org.apache.activemq.jms.server.HornetQJMSServerLogger;
-
-/**
- * XAResourceWrapper.
- *
- * Mainly from org.jboss.server.XAResourceWrapper from the JBoss AS server module
- *
- * The reason why we don't use that class directly is that it assumes on failure of connection
- * the RM_FAIL or RM_ERR is thrown, but in HornetQ we throw XA_RETRY since we want the recovery manager to be able
- * to retry on failure without having to manually retry
- *
- * @author <a href="adrian@jboss.com">Adrian Brock</a>
- * @author <a href="tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- *
- * @version $Revision: 45341 $
- */
-public class HornetQXAResourceWrapper implements XAResource, SessionFailureListener
-{
-   /** The state lock */
-   private static final Object lock = new Object();
-
-   private ServerLocator serverLocator;
-
-   private ClientSessionFactory csf;
-
-   private ClientSession delegate;
-
-   private XARecoveryConfig[] xaRecoveryConfigs;
-
-   public HornetQXAResourceWrapper(XARecoveryConfig... xaRecoveryConfigs)
-   {
-      this.xaRecoveryConfigs = xaRecoveryConfigs;
-
-      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         HornetQJMSServerLogger.LOGGER.debug("Recovery configured with " + Arrays.toString(xaRecoveryConfigs) +
-            ", instance=" +
-            System.identityHashCode(this));
-      }
-   }
-
-   public Xid[] recover(final int flag) throws XAException
-   {
-      XAResource xaResource = getDelegate(false);
-
-      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         HornetQJMSServerLogger.LOGGER.debug("looking for recover at " + xaResource + " configuration " + Arrays.toString(this.xaRecoveryConfigs));
-      }
-
-      try
-      {
-         Xid[] xids = xaResource.recover(flag);
-
-         if (HornetQJMSServerLogger.LOGGER.isDebugEnabled() && xids != null && xids.length > 0)
-         {
-            HornetQJMSServerLogger.LOGGER.debug("Recovering these following IDs " + Arrays.toString(xids) + " at " + this);
-         }
-
-         return xids;
-      }
-      catch (XAException e)
-      {
-         HornetQJMSServerLogger.LOGGER.xaRecoverError(e);
-         throw check(e);
-      }
-   }
-
-   public void commit(final Xid xid, final boolean onePhase) throws XAException
-   {
-      XAResource xaResource = getDelegate(true);
-      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         HornetQJMSServerLogger.LOGGER.debug("Commit " + xaResource + " xid " + " onePhase=" + onePhase);
-      }
-      try
-      {
-         xaResource.commit(xid, onePhase);
-      }
-      catch (XAException e)
-      {
-         throw check(e);
-      }
-   }
-
-   public void rollback(final Xid xid) throws XAException
-   {
-      XAResource xaResource = getDelegate(true);
-      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         HornetQJMSServerLogger.LOGGER.debug("Rollback " + xaResource + " xid ");
-      }
-      try
-      {
-         xaResource.rollback(xid);
-      }
-      catch (XAException e)
-      {
-         throw check(e);
-      }
-   }
-
-   public void forget(final Xid xid) throws XAException
-   {
-      XAResource xaResource = getDelegate(false);
-      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         HornetQJMSServerLogger.LOGGER.debug("Forget " + xaResource + " xid ");
-      }
-
-      try
-      {
-         xaResource.forget(xid);
-      }
-      catch (XAException e)
-      {
-         throw check(e);
-      }
-   }
-
-   public boolean isSameRM(XAResource xaRes) throws XAException
-   {
-      if (xaRes instanceof HornetQXAResourceWrapper)
-      {
-         xaRes = ((HornetQXAResourceWrapper)xaRes).getDelegate(false);
-      }
-
-      XAResource xaResource = getDelegate(false);
-      try
-      {
-         return xaResource.isSameRM(xaRes);
-      }
-      catch (XAException e)
-      {
-         throw check(e);
-      }
-   }
-
-   public int prepare(final Xid xid) throws XAException
-   {
-      XAResource xaResource = getDelegate(true);
-      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         HornetQJMSServerLogger.LOGGER.debug("prepare " + xaResource + " xid ");
-      }
-      try
-      {
-         return xaResource.prepare(xid);
-      }
-      catch (XAException e)
-      {
-         throw check(e);
-      }
-   }
-
-   public void start(final Xid xid, final int flags) throws XAException
-   {
-      XAResource xaResource = getDelegate(false);
-      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         HornetQJMSServerLogger.LOGGER.debug("start " + xaResource + " xid ");
-      }
-      try
-      {
-         xaResource.start(xid, flags);
-      }
-      catch (XAException e)
-      {
-         throw check(e);
-      }
-   }
-
-   public void end(final Xid xid, final int flags) throws XAException
-   {
-      XAResource xaResource = getDelegate(false);
-      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         HornetQJMSServerLogger.LOGGER.debug("end " + xaResource + " xid ");
-      }
-      try
-      {
-         xaResource.end(xid, flags);
-      }
-      catch (XAException e)
-      {
-         throw check(e);
-      }
-   }
-
-   public int getTransactionTimeout() throws XAException
-   {
-      XAResource xaResource = getDelegate(false);
-      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         HornetQJMSServerLogger.LOGGER.debug("getTransactionTimeout " + xaResource + " xid ");
-      }
-      try
-      {
-         return xaResource.getTransactionTimeout();
-      }
-      catch (XAException e)
-      {
-         throw check(e);
-      }
-   }
-
-   public boolean setTransactionTimeout(final int seconds) throws XAException
-   {
-      XAResource xaResource = getDelegate(false);
-      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
-      {
-         HornetQJMSServerLogger.LOGGER.debug("setTransactionTimeout " + xaResource + " xid ");
-      }
-      try
-      {
-         return xaResource.setTransactionTimeout(seconds);
-      }
-      catch (XAException e)
-      {
-         throw check(e);
-      }
-   }
-
-   public void connectionFailed(final ActiveMQException me, boolean failedOver)
-   {
-      if (me.getType() == ActiveMQExceptionType.DISCONNECTED)
-      {
-         if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
-         {
-            HornetQJMSServerLogger.LOGGER.debug("being disconnected for server shutdown", me);
-         }
-      }
-      else
-      {
-         HornetQJMSServerLogger.LOGGER.xaRecoverConnectionError(me, csf);
-      }
-      close();
-   }
-
-   @Override
-   public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID)
-   {
-      connectionFailed(me, failedOver);
-   }
-
-   public void beforeReconnect(final ActiveMQException me)
-   {
-   }
-
-   /**
-    * Get the connectionFactory XAResource
-    *
-    * @return the connectionFactory
-    * @throws XAException for any problem
-    */
-   private XAResource getDelegate(boolean retry) throws XAException
-   {
-      XAResource result = null;
-      Exception error = null;
-      try
-      {
-         result = connect();
-      }
-      catch (Exception e)
-      {
-         error = e;
-      }
-
-      if (result == null)
-      {
-         // we should always throw a retry for certain methods comit etc, if not the tx is marked as a heuristic and
-         // all chaos is let loose
-         if (retry)
-         {
-            XAException xae = new XAException("Connection unavailable for xa recovery");
-            xae.errorCode = XAException.XA_RETRY;
-            if (error != null)
-            {
-               xae.initCause(error);
-            }
-            if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
-            {
-               HornetQJMSServerLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae);
-            }
-            throw xae;
-         }
-         else
-         {
-            XAException xae = new XAException("Error trying to connect to any providers for xa recovery");
-            xae.errorCode = XAException.XAER_RMERR;
-            if (error != null)
-            {
-               xae.initCause(error);
-            }
-            if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
-            {
-               HornetQJMSServerLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae);
-            }
-            throw xae;
-         }
-
-      }
-
-      return result;
-   }
-
-   /**
-    * Connect to the server if not already done so
-    *
-    * @return the connectionFactory XAResource
-    * @throws Exception for any problem
-    */
-   protected XAResource connect() throws Exception
-   {
-      // Do we already have a valid connectionFactory?
-      synchronized (HornetQXAResourceWrapper.lock)
-      {
-         if (delegate != null)
-         {
-            return delegate;
-         }
-      }
-
-      for (XARecoveryConfig xaRecoveryConfig : xaRecoveryConfigs)
-      {
-
-         if (xaRecoveryConfig == null)
-         {
-            continue;
-         }
-         if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
-         {
-            HornetQJMSServerLogger.LOGGER.debug("Trying to connect recovery on " + xaRecoveryConfig + " of " + Arrays.toString(xaRecoveryConfigs));
-         }
-
-         ClientSession cs = null;
-
-         try
-         {
-            // setting ha=false because otherwise the connector would go towards any server, causing Heuristic exceptions
-            // we really need to control what server it's connected to
-
-            // Manual configuration may still use discovery, so we will keep this
-            if (xaRecoveryConfig.getDiscoveryConfiguration() != null)
-            {
-               serverLocator = HornetQClient.createServerLocator(false, xaRecoveryConfig.getDiscoveryConfiguration());
-            }
-            else
-            {
-               serverLocator = HornetQClient.createServerLocator(false, xaRecoveryConfig.getTransportConfig());
-            }
-            serverLocator.disableFinalizeCheck();
-            csf = serverLocator.createSessionFactory();
-            if (xaRecoveryConfig.getUsername() == null)
-            {
-               cs = csf.createSession(true, false, false);
-            }
-            else
-            {
-               cs = csf.createSession(xaRecoveryConfig.getUsername(),
-                  xaRecoveryConfig.getPassword(),
-                  true,
-                  false,
-                  false,
-                  false,
-                  1);
-            }
-         }
-         catch (Throwable e)
-         {
-            HornetQJMSServerLogger.LOGGER.xaRecoverAutoConnectionError(e, xaRecoveryConfig);
-            if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
-            {
-               HornetQJMSServerLogger.LOGGER.debug(e.getMessage(), e);
-            }
-
-            try
-            {
-               if (cs != null) cs.close();
-               if (serverLocator != null) serverLocator.close();
-            }
-            catch (Throwable ignored)
-            {
-               if (HornetQJMSServerLogger.LOGGER.isTraceEnabled())
-               {
-                  HornetQJMSServerLogger.LOGGER.trace(e.getMessage(), ignored);
-               }
-            }
-            continue;
-         }
-
-         cs.addFailureListener(this);
-
-         synchronized (HornetQXAResourceWrapper.lock)
-         {
-            delegate = cs;
-         }
-
-         return delegate;
-      }
-      HornetQJMSServerLogger.LOGGER.recoveryConnectFailed(Arrays.toString(xaRecoveryConfigs));
-      throw new ActiveMQNotConnectedException();
-   }
-
-   /* (non-Javadoc)
-    * @see java.lang.Object#toString()
-    */
-   @Override
-   public String toString()
-   {
-      return "HornetQXAResourceWrapper [serverLocator=" + serverLocator +
-         ", csf=" +
-         csf +
-         ", delegate=" +
-         delegate +
-         ", xaRecoveryConfigs=" +
-         Arrays.toString(xaRecoveryConfigs) +
-         ", instance=" +
-         System.identityHashCode(this) +
-         "]";
-   }
-
-   /**
-    * Close the connection
-    */
-   public void close()
-   {
-      ServerLocator oldServerLocator = null;
-      ClientSessionFactory oldCSF = null;
-      ClientSession oldDelegate = null;
-      synchronized (HornetQXAResourceWrapper.lock)
-      {
-         oldCSF = csf;
-         csf = null;
-         oldDelegate = delegate;
-         delegate = null;
-         oldServerLocator = serverLocator;
-         serverLocator = null;
-      }
-
-      if (oldDelegate != null)
-      {
-         try
-         {
-            oldDelegate.close();
-         }
-         catch (Throwable ignorable)
-         {
-            HornetQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
-         }
-      }
-
-      if (oldCSF != null)
-      {
-         try
-         {
-            oldCSF.close();
-         }
-         catch (Throwable ignorable)
-         {
-            HornetQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
-         }
-      }
-
-      if (oldServerLocator != null)
-      {
-         try
-         {
-            oldServerLocator.close();
-         }
-         catch (Throwable ignorable)
-         {
-            HornetQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
-         }
-      }
-   }
-
-   /**
-    * Check whether an XAException is fatal. If it is an RM problem
-    * we close the connection so the next call will reconnect.
-    *
-    * @param e the xa exception
-    * @return never
-    * @throws XAException always
-    */
-   protected XAException check(final XAException e) throws XAException
-   {
-      HornetQJMSServerLogger.LOGGER.xaRecoveryError(e);
-
-
-      // If any exception happened, we close the connection so we may start fresh
-      close();
-      throw e;
-   }
-
-   @Override
-   protected void finalize() throws Throwable
-   {
-      close();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/RecoveryDiscovery.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/RecoveryDiscovery.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/RecoveryDiscovery.java
index 7510361..b705db9 100644
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/RecoveryDiscovery.java
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/RecoveryDiscovery.java
@@ -23,12 +23,12 @@ import org.apache.activemq.api.core.client.ServerLocator;
 import org.apache.activemq.api.core.client.SessionFailureListener;
 import org.apache.activemq.api.core.client.TopologyMember;
 import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal;
-import org.apache.activemq.jms.server.HornetQJMSServerLogger;
+import org.apache.activemq.jms.server.ActiveMQJMSServerLogger;
 
 /**
  * <p>This class will have a simple Connection Factory and will listen
  * for topology updates. </p>
- * <p>This Discovery is instantiated by {@link HornetQRecoveryRegistry}
+ * <p>This Discovery is instantiated by {@link ActiveMQRecoveryRegistry}
  *
  * @author clebertsuconic
  */
@@ -51,7 +51,7 @@ public class RecoveryDiscovery implements SessionFailureListener
    {
       if (!started)
       {
-         HornetQJMSServerLogger.LOGGER.debug("Starting RecoveryDiscovery on " + config);
+         ActiveMQJMSServerLogger.LOGGER.debug("Starting RecoveryDiscovery on " + config);
          started = true;
 
          locator = config.createServerLocator();
@@ -65,16 +65,16 @@ public class RecoveryDiscovery implements SessionFailureListener
             // in case of failure we will retry
             sessionFactory.addFailureListener(this);
 
-            HornetQJMSServerLogger.LOGGER.debug("RecoveryDiscovery started fine on " + config);
+            ActiveMQJMSServerLogger.LOGGER.debug("RecoveryDiscovery started fine on " + config);
          }
          catch (Exception startupError)
          {
             if (!retry)
             {
-               HornetQJMSServerLogger.LOGGER.xaRecoveryStartError(config);
+               ActiveMQJMSServerLogger.LOGGER.xaRecoveryStartError(config);
             }
             stop();
-            HornetQRecoveryRegistry.getInstance().failedDiscovery(this);
+            ActiveMQRecoveryRegistry.getInstance().failedDiscovery(this);
          }
 
       }
@@ -123,7 +123,7 @@ public class RecoveryDiscovery implements SessionFailureListener
          }
          catch (Exception ignored)
          {
-            HornetQJMSServerLogger.LOGGER.debug(ignored, ignored);
+            ActiveMQJMSServerLogger.LOGGER.debug(ignored, ignored);
          }
 
          try
@@ -132,7 +132,7 @@ public class RecoveryDiscovery implements SessionFailureListener
          }
          catch (Exception ignored)
          {
-            HornetQJMSServerLogger.LOGGER.debug(ignored, ignored);
+            ActiveMQJMSServerLogger.LOGGER.debug(ignored, ignored);
          }
 
          sessionFactory = null;
@@ -160,7 +160,7 @@ public class RecoveryDiscovery implements SessionFailureListener
             Pair<TransportConfiguration, TransportConfiguration> connector =
                new Pair<TransportConfiguration, TransportConfiguration>(topologyMember.getLive(),
                                                                         topologyMember.getBackup());
-            HornetQRecoveryRegistry.getInstance().nodeUp(topologyMember.getNodeId(), connector,
+            ActiveMQRecoveryRegistry.getInstance().nodeUp(topologyMember.getNodeId(), connector,
                                                          config.getUsername(), config.getPassword());
          }
       }
@@ -180,15 +180,15 @@ public class RecoveryDiscovery implements SessionFailureListener
    {
       if (exception.getType() == ActiveMQExceptionType.DISCONNECTED)
       {
-         HornetQJMSServerLogger.LOGGER.warn("being disconnected for server shutdown", exception);
+         ActiveMQJMSServerLogger.LOGGER.warn("being disconnected for server shutdown", exception);
       }
       else
       {
-         HornetQJMSServerLogger.LOGGER.warn("Notified of connection failure in xa discovery, we will retry on the next recovery",
+         ActiveMQJMSServerLogger.LOGGER.warn("Notified of connection failure in xa discovery, we will retry on the next recovery",
                                             exception);
       }
       internalStop();
-      HornetQRecoveryRegistry.getInstance().failedDiscovery(this);
+      ActiveMQRecoveryRegistry.getInstance().failedDiscovery(this);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/XARecoveryConfig.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/XARecoveryConfig.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/XARecoveryConfig.java
index 6c00917..24e40c0 100644
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/XARecoveryConfig.java
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/XARecoveryConfig.java
@@ -16,9 +16,9 @@ import java.util.Arrays;
 
 import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
 import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.api.core.client.HornetQClient;
+import org.apache.activemq.api.core.client.ActiveMQClient;
 import org.apache.activemq.api.core.client.ServerLocator;
-import org.apache.activemq.jms.client.HornetQConnectionFactory;
+import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
 
 /**
  *
@@ -40,7 +40,7 @@ public class XARecoveryConfig
    private final String username;
    private final String password;
 
-   public static XARecoveryConfig newConfig(HornetQConnectionFactory factory,
+   public static XARecoveryConfig newConfig(ActiveMQConnectionFactory factory,
                                             String userName,
                                             String password)
    {
@@ -107,11 +107,11 @@ public class XARecoveryConfig
    {
       if (getDiscoveryConfiguration() != null)
       {
-         return HornetQClient.createServerLocator(isHA(), getDiscoveryConfiguration());
+         return ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration());
       }
       else
       {
-         return HornetQClient.createServerLocator(isHA(), getTransportConfig());
+         return ActiveMQClient.createServerLocator(isHA(), getTransportConfig());
       }
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/package-info.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/package-info.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/package-info.java
index 7d0ae6e..a44d114 100644
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/package-info.java
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/recovery/package-info.java
@@ -12,10 +12,10 @@
  */
 /**
  * This package is used to locate resources and connectors along the cluster set
- * I - JCA Connection Factories or InBound MDBs will call HornetQRegistryBase::register(XARecoveryConfig)
+ * I - JCA Connection Factories or InBound MDBs will call ActiveMQRegistryBase::register(XARecoveryConfig)
  * II - For each XARecoveryConfig the RegistryBase will instantiate a ResourceDiscoveryUnit which will
  *      connect using that configuration and inform the Registry of any topology members
- * III - For each topology member found on the DiscoveryUnits, the RegistryBase will registry a HornetQResourceRecovery
+ * III - For each topology member found on the DiscoveryUnits, the RegistryBase will registry a ActiveMQResourceRecovery
  *       that will exist per server
   */
 package org.apache.activemq.jms.server.recovery;


Mime
View raw message