activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [11/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:41:41 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQRecoveryRegistry.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQRecoveryRegistry.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQRecoveryRegistry.java
new file mode 100644
index 0000000..094c4be
--- /dev/null
+++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQRecoveryRegistry.java
@@ -0,0 +1,250 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.server.recovery;
+
+import javax.transaction.xa.XAResource;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq6.api.core.Pair;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.jms.server.HornetQJMSServerLogger;
+import org.jboss.tm.XAResourceRecovery;
+
+/**
+ * <p>This class is used by the Resource Adapter to register RecoveryDiscovery, which is based on the {@link XARecoveryConfig}</p>
+ * <p>Each outbound or inboud connection will pass the configuration here through by calling the method {@link HornetQRecoveryRegistry#register(XARecoveryConfig)}</p>
+ * <p>Later the {@link RecoveryDiscovery} will call {@link HornetQRecoveryRegistry#nodeUp(String, Pair, String, String)}
+ * so we will keep a track of nodes on the cluster
+ * or nodes where this server is connected to. </p>
+ *
+ * @author clebertsuconic
+ */
+public class HornetQRecoveryRegistry implements XAResourceRecovery
+{
+
+   private static final HornetQRecoveryRegistry theInstance = new HornetQRecoveryRegistry();
+
+   private final ConcurrentHashMap<XARecoveryConfig, RecoveryDiscovery> configSet = new ConcurrentHashMap<XARecoveryConfig, RecoveryDiscovery>();
+
+   /**
+    * The list by server id and resource adapter wrapper, what will actually be calling recovery.
+    * This will be returned by getXAResources
+    */
+   private final ConcurrentHashMap<String, HornetQXAResourceWrapper> recoveries = new ConcurrentHashMap<String, HornetQXAResourceWrapper>();
+
+   /**
+    * In case of failures, we retry on the next getXAResources
+    */
+   private final Set<RecoveryDiscovery> failedDiscoverySet = new HashSet<RecoveryDiscovery>();
+
+   private HornetQRecoveryRegistry()
+   {
+   }
+
+   /**
+    * This will be called periodically by the Transaction Manager
+    */
+   public XAResource[] getXAResources()
+   {
+      try
+      {
+         checkFailures();
+
+         HornetQXAResourceWrapper[] resourceArray = new HornetQXAResourceWrapper[recoveries.size()];
+         resourceArray = recoveries.values().toArray(resourceArray);
+
+         if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
+         {
+            HornetQJMSServerLogger.LOGGER.debug("\n=======================================================================================");
+            HornetQJMSServerLogger.LOGGER.debug("Returning the following list on getXAREsources:");
+            for (Map.Entry<String, HornetQXAResourceWrapper> entry : recoveries.entrySet())
+            {
+               HornetQJMSServerLogger.LOGGER.debug("server-id=" + entry.getKey() + ", value=" + entry.getValue());
+            }
+            HornetQJMSServerLogger.LOGGER.debug("=======================================================================================\n");
+         }
+
+         return resourceArray;
+      }
+      catch (Throwable e)
+      {
+         HornetQJMSServerLogger.LOGGER.warn(e.getMessage(), e);
+         return new XAResource[]{};
+      }
+   }
+
+   public static HornetQRecoveryRegistry getInstance()
+   {
+      return theInstance;
+   }
+
+   /**
+    * This will be called by then resource adapters, to register a new discovery
+    *
+    * @param resourceConfig
+    */
+   public void register(final XARecoveryConfig resourceConfig)
+   {
+      RecoveryDiscovery newInstance = new RecoveryDiscovery(resourceConfig);
+      RecoveryDiscovery discoveryRecord = configSet.putIfAbsent(resourceConfig, newInstance);
+      if (discoveryRecord == null)
+      {
+         discoveryRecord = newInstance;
+         discoveryRecord.start(false);
+      }
+      // you could have a configuration shared with multiple MDBs or RAs
+      discoveryRecord.incrementUsage();
+   }
+
+   /**
+    * Reference counts and deactivate a configuration
+    * Notice: this won't remove the servers since a server may have previous XIDs
+    *
+    * @param resourceConfig
+    */
+   public void unRegister(final XARecoveryConfig resourceConfig)
+   {
+      RecoveryDiscovery discoveryRecord = configSet.get(resourceConfig);
+      if (discoveryRecord != null && discoveryRecord.decrementUsage() == 0)
+      {
+         discoveryRecord = configSet.remove(resourceConfig);
+         if (discoveryRecord != null)
+         {
+            discoveryRecord.stop();
+         }
+      }
+   }
+
+   /**
+    * We need to make sure that all resources are closed, we don't actually do this when a resourceConfig is closed but
+    * maybe we should.
+    */
+   public void stop()
+   {
+      for (RecoveryDiscovery recoveryDiscovery : configSet.values())
+      {
+         recoveryDiscovery.stop();
+      }
+      for (HornetQXAResourceWrapper hornetQXAResourceWrapper : recoveries.values())
+      {
+         hornetQXAResourceWrapper.close();
+      }
+      recoveries.clear();
+      configSet.clear();
+   }
+
+   /**
+    * in case of a failure the Discovery will register itslef to retry
+    *
+    * @param failedDiscovery
+    */
+   public void failedDiscovery(RecoveryDiscovery failedDiscovery)
+   {
+      HornetQJMSServerLogger.LOGGER.debug("RecoveryDiscovery being set to restart:" + failedDiscovery);
+      synchronized (failedDiscoverySet)
+      {
+         failedDiscoverySet.add(failedDiscovery);
+      }
+   }
+
+   /**
+    * @param nodeID
+    * @param networkConfiguration
+    * @param username
+    * @param password
+    */
+   public void nodeUp(String nodeID,
+                      Pair<TransportConfiguration, TransportConfiguration> networkConfiguration,
+                      String username,
+                      String password)
+   {
+
+      if (recoveries.get(nodeID) == null)
+      {
+         if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
+         {
+            HornetQJMSServerLogger.LOGGER.debug(nodeID + " being registered towards " + networkConfiguration);
+         }
+         XARecoveryConfig config = new XARecoveryConfig(true,
+                                                        extractTransportConfiguration(networkConfiguration),
+                                                        username,
+                                                        password);
+
+         HornetQXAResourceWrapper wrapper = new HornetQXAResourceWrapper(config);
+         recoveries.putIfAbsent(nodeID, wrapper);
+      }
+   }
+
+   public void nodeDown(String nodeID)
+   {
+   }
+
+   /**
+    * this will go through the list of retries
+    */
+   private void checkFailures()
+   {
+      final HashSet<RecoveryDiscovery> failures = new HashSet<RecoveryDiscovery>();
+
+      // it will transfer all the discoveries to a new collection
+      synchronized (failedDiscoverySet)
+      {
+         failures.addAll(failedDiscoverySet);
+         failedDiscoverySet.clear();
+      }
+
+      if (failures.size() > 0)
+      {
+         // This shouldn't happen on a regular scenario, however when this retry happens this needs
+         // to be done on a new thread
+         Thread t = new Thread("HornetQ Recovery Discovery Reinitialization")
+         {
+            @Override
+            public void run()
+            {
+               for (RecoveryDiscovery discovery : failures)
+               {
+                  try
+                  {
+                     HornetQJMSServerLogger.LOGGER.debug("Retrying discovery " + discovery);
+                     discovery.start(true);
+                  }
+                  catch (Throwable e)
+                  {
+                     HornetQJMSServerLogger.LOGGER.warn(e.getMessage(), e);
+                  }
+               }
+            }
+         };
+
+         t.start();
+      }
+   }
+
+   /**
+    * @param networkConfiguration
+    * @return
+    */
+   private TransportConfiguration[] extractTransportConfiguration(Pair<TransportConfiguration, TransportConfiguration> networkConfiguration)
+   {
+      if (networkConfiguration.getB() != null)
+      {
+         return new TransportConfiguration[]{networkConfiguration.getA(), networkConfiguration.getB()};
+      }
+      return new TransportConfiguration[]{networkConfiguration.getA()};
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQRegistryBase.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQRegistryBase.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQRegistryBase.java
new file mode 100644
index 0000000..a0b7f5e
--- /dev/null
+++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQRegistryBase.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.server.recovery;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.jboss.tm.XAResourceRecoveryRegistry;
+
+/**
+ * This class is a base class for the integration layer where
+ * This class is used on integration points and this is just a bridge to the real registry at
+ * {@link HornetQRecoveryRegistry}
+ *
+ * @author Clebert
+ *
+ *
+ */
+public abstract class HornetQRegistryBase
+{
+
+   private final AtomicBoolean started = new AtomicBoolean(false);
+
+   public HornetQRegistryBase()
+   {
+   }
+
+
+   public abstract XAResourceRecoveryRegistry getTMRegistry();
+
+   public void register(final XARecoveryConfig resourceConfig)
+   {
+      init();
+      HornetQRecoveryRegistry.getInstance().register(resourceConfig);
+   }
+
+
+
+   public void unRegister(final XARecoveryConfig resourceConfig)
+   {
+      init();
+      HornetQRecoveryRegistry.getInstance().unRegister(resourceConfig);
+   }
+
+   public void stop()
+   {
+      if (started.compareAndSet(true, false) && getTMRegistry() != null)
+      {
+         getTMRegistry().removeXAResourceRecovery(HornetQRecoveryRegistry.getInstance());
+         HornetQRecoveryRegistry.getInstance().stop();
+      }
+   }
+
+   private void init()
+   {
+      if (started.compareAndSet(false, true) && getTMRegistry() != null)
+      {
+         getTMRegistry().addXAResourceRecovery(HornetQRecoveryRegistry.getInstance());
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQXAResourceRecovery.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQXAResourceRecovery.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQXAResourceRecovery.java
new file mode 100644
index 0000000..947ddb6
--- /dev/null
+++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQXAResourceRecovery.java
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.server.recovery;
+
+import javax.transaction.xa.XAResource;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.arjuna.ats.jta.recovery.XAResourceRecovery;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.jms.server.HornetQJMSServerLogger;
+
+/**
+ * A XAResourceRecovery instance that can be used to recover any JMS provider.
+ * <p>
+ * In reality only recover, rollback and commit will be called but we still need to be implement all
+ * methods just in case.
+ * <p>
+ * To enable this add the following to the jbossts-properties file
+ * <pre>
+ * &lt;property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1"
+ *                 value="org.apache.activemq6.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"/&gt;
+ * </pre>
+ * <p>
+ * you'll need something like this if the HornetQ Server is remote
+ * <pre>
+ *      &lt;property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2"
+ *                  value="org.apache.activemq6.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445"/&gt;
+ * </pre>
+ * <p>
+ * you'll need something like this if the HornetQ Server is remote and has failover configured
+ * <pre>
+ *             &lt;property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2"
+ *                       value="org.apache.activemq6.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost2,port=5446"/&gt;
+ * </pre>
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ */
+public class HornetQXAResourceRecovery implements XAResourceRecovery
+{
+   private final boolean trace = HornetQJMSServerLogger.LOGGER.isTraceEnabled();
+
+   private boolean hasMore;
+
+   private HornetQXAResourceWrapper res;
+
+   public HornetQXAResourceRecovery()
+   {
+      if (trace)
+      {
+         HornetQJMSServerLogger.LOGGER.trace("Constructing HornetQXAResourceRecovery");
+      }
+   }
+
+   public boolean initialise(final String config)
+   {
+      if (HornetQJMSServerLogger.LOGGER.isTraceEnabled())
+      {
+         HornetQJMSServerLogger.LOGGER.trace(this + " intialise: " + config);
+      }
+
+      String[] configs = config.split(";");
+      XARecoveryConfig[] xaRecoveryConfigs = new XARecoveryConfig[configs.length];
+      for (int i = 0, configsLength = configs.length; i < configsLength; i++)
+      {
+         String s = configs[i];
+         ConfigParser parser = new ConfigParser(s);
+         String connectorFactoryClassName = parser.getConnectorFactoryClassName();
+         Map<String, Object> connectorParams = parser.getConnectorParameters();
+         String username = parser.getUsername();
+         String password = parser.getPassword();
+         TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams);
+         xaRecoveryConfigs[i] = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfiguration}, username, password);
+      }
+
+
+      res = new HornetQXAResourceWrapper(xaRecoveryConfigs);
+
+      if (HornetQJMSServerLogger.LOGGER.isTraceEnabled())
+      {
+         HornetQJMSServerLogger.LOGGER.trace(this + " initialised");
+      }
+
+      return true;
+   }
+
+   public boolean hasMoreResources()
+   {
+      if (HornetQJMSServerLogger.LOGGER.isTraceEnabled())
+      {
+         HornetQJMSServerLogger.LOGGER.trace(this + " hasMoreResources");
+      }
+
+      /*
+       * The way hasMoreResources is supposed to work is as follows:
+       * For each "sweep" the recovery manager will call hasMoreResources, then if it returns
+       * true it will call getXAResource.
+       * It will repeat that until hasMoreResources returns false.
+       * Then the sweep is over.
+       * For the next sweep hasMoreResources should return true, etc.
+       *
+       * In our case where we only need to return one XAResource per sweep,
+       * hasMoreResources should basically alternate between true and false.
+       *
+       *
+       */
+
+      hasMore = !hasMore;
+
+      return hasMore;
+   }
+
+   public XAResource getXAResource()
+   {
+      if (HornetQJMSServerLogger.LOGGER.isTraceEnabled())
+      {
+         HornetQJMSServerLogger.LOGGER.trace(this + " getXAResource");
+      }
+
+      return res;
+   }
+
+   public XAResource[] getXAResources()
+   {
+      return new XAResource[]{res};
+   }
+
+   @Override
+   protected void finalize()
+   {
+      res.close();
+   }
+
+   public static class ConfigParser
+   {
+      private final String connectorFactoryClassName;
+
+      private final Map<String, Object> connectorParameters;
+
+      private String username;
+
+      private String password;
+
+      public ConfigParser(final String config)
+      {
+         if (config == null || config.length() == 0)
+         {
+            throw new IllegalArgumentException("Must specify provider connector factory class name in config");
+         }
+
+         String[] strings = config.split(",");
+
+         // First (mandatory) param is the connector factory class name
+         if (strings.length < 1)
+         {
+            throw new IllegalArgumentException("Must specify provider connector factory class name in config");
+         }
+
+         connectorFactoryClassName = strings[0].trim();
+
+         // Next two (optional) parameters are the username and password to use for creating the session for recovery
+
+         if (strings.length >= 2)
+         {
+
+            username = strings[1].trim();
+            if (username.length() == 0)
+            {
+               username = null;
+            }
+
+            if (strings.length == 2)
+            {
+               throw new IllegalArgumentException("If username is specified, password must be specified too");
+            }
+
+            password = strings[2].trim();
+            if (password.length() == 0)
+            {
+               password = null;
+            }
+         }
+
+         // other tokens are for connector configurations
+         connectorParameters = new HashMap<String, Object>();
+         if (strings.length >= 3)
+         {
+            for (int i = 3; i < strings.length; i++)
+            {
+               String[] str = strings[i].split("=");
+               if (str.length == 2)
+               {
+                  connectorParameters.put(str[0].trim(), str[1].trim());
+               }
+            }
+         }
+      }
+
+      public String getConnectorFactoryClassName()
+      {
+         return connectorFactoryClassName;
+      }
+
+      public Map<String, Object> getConnectorParameters()
+      {
+         return connectorParameters;
+      }
+
+      public String getUsername()
+      {
+         return username;
+      }
+
+      public String getPassword()
+      {
+         return password;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQXAResourceWrapper.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQXAResourceWrapper.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQXAResourceWrapper.java
new file mode 100644
index 0000000..c818362
--- /dev/null
+++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/HornetQXAResourceWrapper.java
@@ -0,0 +1,531 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.server.recovery;
+
+import java.util.Arrays;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQExceptionType;
+import org.apache.activemq6.api.core.HornetQNotConnectedException;
+import org.apache.activemq6.api.core.client.ClientSession;
+import org.apache.activemq6.api.core.client.ClientSessionFactory;
+import org.apache.activemq6.api.core.client.HornetQClient;
+import org.apache.activemq6.api.core.client.ServerLocator;
+import org.apache.activemq6.api.core.client.SessionFailureListener;
+import org.apache.activemq6.jms.server.HornetQJMSServerLogger;
+
+/**
+ * XAResourceWrapper.
+ *
+ * Mainly from org.jboss.server.XAResourceWrapper from the JBoss AS server module
+ *
+ * The reason why we don't use that class directly is that it assumes on failure of connection
+ * the RM_FAIL or RM_ERR is thrown, but in HornetQ we throw XA_RETRY since we want the recovery manager to be able
+ * to retry on failure without having to manually retry
+ *
+ * @author <a href="adrian@jboss.com">Adrian Brock</a>
+ * @author <a href="tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *
+ * @version $Revision: 45341 $
+ */
+public class HornetQXAResourceWrapper implements XAResource, SessionFailureListener
+{
+   /** The state lock */
+   private static final Object lock = new Object();
+
+   private ServerLocator serverLocator;
+
+   private ClientSessionFactory csf;
+
+   private ClientSession delegate;
+
+   private XARecoveryConfig[] xaRecoveryConfigs;
+
+   public HornetQXAResourceWrapper(XARecoveryConfig... xaRecoveryConfigs)
+   {
+      this.xaRecoveryConfigs = xaRecoveryConfigs;
+
+      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         HornetQJMSServerLogger.LOGGER.debug("Recovery configured with " + Arrays.toString(xaRecoveryConfigs) +
+            ", instance=" +
+            System.identityHashCode(this));
+      }
+   }
+
+   public Xid[] recover(final int flag) throws XAException
+   {
+      XAResource xaResource = getDelegate(false);
+
+      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         HornetQJMSServerLogger.LOGGER.debug("looking for recover at " + xaResource + " configuration " + Arrays.toString(this.xaRecoveryConfigs));
+      }
+
+      try
+      {
+         Xid[] xids = xaResource.recover(flag);
+
+         if (HornetQJMSServerLogger.LOGGER.isDebugEnabled() && xids != null && xids.length > 0)
+         {
+            HornetQJMSServerLogger.LOGGER.debug("Recovering these following IDs " + Arrays.toString(xids) + " at " + this);
+         }
+
+         return xids;
+      }
+      catch (XAException e)
+      {
+         HornetQJMSServerLogger.LOGGER.xaRecoverError(e);
+         throw check(e);
+      }
+   }
+
+   public void commit(final Xid xid, final boolean onePhase) throws XAException
+   {
+      XAResource xaResource = getDelegate(true);
+      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         HornetQJMSServerLogger.LOGGER.debug("Commit " + xaResource + " xid " + " onePhase=" + onePhase);
+      }
+      try
+      {
+         xaResource.commit(xid, onePhase);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void rollback(final Xid xid) throws XAException
+   {
+      XAResource xaResource = getDelegate(true);
+      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         HornetQJMSServerLogger.LOGGER.debug("Rollback " + xaResource + " xid ");
+      }
+      try
+      {
+         xaResource.rollback(xid);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void forget(final Xid xid) throws XAException
+   {
+      XAResource xaResource = getDelegate(false);
+      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         HornetQJMSServerLogger.LOGGER.debug("Forget " + xaResource + " xid ");
+      }
+
+      try
+      {
+         xaResource.forget(xid);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public boolean isSameRM(XAResource xaRes) throws XAException
+   {
+      if (xaRes instanceof HornetQXAResourceWrapper)
+      {
+         xaRes = ((HornetQXAResourceWrapper)xaRes).getDelegate(false);
+      }
+
+      XAResource xaResource = getDelegate(false);
+      try
+      {
+         return xaResource.isSameRM(xaRes);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public int prepare(final Xid xid) throws XAException
+   {
+      XAResource xaResource = getDelegate(true);
+      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         HornetQJMSServerLogger.LOGGER.debug("prepare " + xaResource + " xid ");
+      }
+      try
+      {
+         return xaResource.prepare(xid);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void start(final Xid xid, final int flags) throws XAException
+   {
+      XAResource xaResource = getDelegate(false);
+      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         HornetQJMSServerLogger.LOGGER.debug("start " + xaResource + " xid ");
+      }
+      try
+      {
+         xaResource.start(xid, flags);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void end(final Xid xid, final int flags) throws XAException
+   {
+      XAResource xaResource = getDelegate(false);
+      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         HornetQJMSServerLogger.LOGGER.debug("end " + xaResource + " xid ");
+      }
+      try
+      {
+         xaResource.end(xid, flags);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public int getTransactionTimeout() throws XAException
+   {
+      XAResource xaResource = getDelegate(false);
+      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         HornetQJMSServerLogger.LOGGER.debug("getTransactionTimeout " + xaResource + " xid ");
+      }
+      try
+      {
+         return xaResource.getTransactionTimeout();
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public boolean setTransactionTimeout(final int seconds) throws XAException
+   {
+      XAResource xaResource = getDelegate(false);
+      if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
+      {
+         HornetQJMSServerLogger.LOGGER.debug("setTransactionTimeout " + xaResource + " xid ");
+      }
+      try
+      {
+         return xaResource.setTransactionTimeout(seconds);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void connectionFailed(final HornetQException me, boolean failedOver)
+   {
+      if (me.getType() == HornetQExceptionType.DISCONNECTED)
+      {
+         if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
+         {
+            HornetQJMSServerLogger.LOGGER.debug("being disconnected for server shutdown", me);
+         }
+      }
+      else
+      {
+         HornetQJMSServerLogger.LOGGER.xaRecoverConnectionError(me, csf);
+      }
+      close();
+   }
+
+   @Override
+   public void connectionFailed(final HornetQException me, boolean failedOver, String scaleDownTargetNodeID)
+   {
+      connectionFailed(me, failedOver);
+   }
+
+   public void beforeReconnect(final HornetQException me)
+   {
+   }
+
+   /**
+    * Get the connectionFactory XAResource
+    *
+    * @return the connectionFactory
+    * @throws XAException for any problem
+    */
+   private XAResource getDelegate(boolean retry) throws XAException
+   {
+      XAResource result = null;
+      Exception error = null;
+      try
+      {
+         result = connect();
+      }
+      catch (Exception e)
+      {
+         error = e;
+      }
+
+      if (result == null)
+      {
+         // we should always throw a retry for certain methods comit etc, if not the tx is marked as a heuristic and
+         // all chaos is let loose
+         if (retry)
+         {
+            XAException xae = new XAException("Connection unavailable for xa recovery");
+            xae.errorCode = XAException.XA_RETRY;
+            if (error != null)
+            {
+               xae.initCause(error);
+            }
+            if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
+            {
+               HornetQJMSServerLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae);
+            }
+            throw xae;
+         }
+         else
+         {
+            XAException xae = new XAException("Error trying to connect to any providers for xa recovery");
+            xae.errorCode = XAException.XAER_RMERR;
+            if (error != null)
+            {
+               xae.initCause(error);
+            }
+            if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
+            {
+               HornetQJMSServerLogger.LOGGER.debug("Cannot get connectionFactory XAResource", xae);
+            }
+            throw xae;
+         }
+
+      }
+
+      return result;
+   }
+
+   /**
+    * Connect to the server if not already done so
+    *
+    * @return the connectionFactory XAResource
+    * @throws Exception for any problem
+    */
+   protected XAResource connect() throws Exception
+   {
+      // Do we already have a valid connectionFactory?
+      synchronized (HornetQXAResourceWrapper.lock)
+      {
+         if (delegate != null)
+         {
+            return delegate;
+         }
+      }
+
+      for (XARecoveryConfig xaRecoveryConfig : xaRecoveryConfigs)
+      {
+
+         if (xaRecoveryConfig == null)
+         {
+            continue;
+         }
+         if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
+         {
+            HornetQJMSServerLogger.LOGGER.debug("Trying to connect recovery on " + xaRecoveryConfig + " of " + Arrays.toString(xaRecoveryConfigs));
+         }
+
+         ClientSession cs = null;
+
+         try
+         {
+            // setting ha=false because otherwise the connector would go towards any server, causing Heuristic exceptions
+            // we really need to control what server it's connected to
+
+            // Manual configuration may still use discovery, so we will keep this
+            if (xaRecoveryConfig.getDiscoveryConfiguration() != null)
+            {
+               serverLocator = HornetQClient.createServerLocator(false, xaRecoveryConfig.getDiscoveryConfiguration());
+            }
+            else
+            {
+               serverLocator = HornetQClient.createServerLocator(false, xaRecoveryConfig.getTransportConfig());
+            }
+            serverLocator.disableFinalizeCheck();
+            csf = serverLocator.createSessionFactory();
+            if (xaRecoveryConfig.getUsername() == null)
+            {
+               cs = csf.createSession(true, false, false);
+            }
+            else
+            {
+               cs = csf.createSession(xaRecoveryConfig.getUsername(),
+                  xaRecoveryConfig.getPassword(),
+                  true,
+                  false,
+                  false,
+                  false,
+                  1);
+            }
+         }
+         catch (Throwable e)
+         {
+            HornetQJMSServerLogger.LOGGER.xaRecoverAutoConnectionError(e, xaRecoveryConfig);
+            if (HornetQJMSServerLogger.LOGGER.isDebugEnabled())
+            {
+               HornetQJMSServerLogger.LOGGER.debug(e.getMessage(), e);
+            }
+
+            try
+            {
+               if (cs != null) cs.close();
+               if (serverLocator != null) serverLocator.close();
+            }
+            catch (Throwable ignored)
+            {
+               if (HornetQJMSServerLogger.LOGGER.isTraceEnabled())
+               {
+                  HornetQJMSServerLogger.LOGGER.trace(e.getMessage(), ignored);
+               }
+            }
+            continue;
+         }
+
+         cs.addFailureListener(this);
+
+         synchronized (HornetQXAResourceWrapper.lock)
+         {
+            delegate = cs;
+         }
+
+         return delegate;
+      }
+      HornetQJMSServerLogger.LOGGER.recoveryConnectFailed(Arrays.toString(xaRecoveryConfigs));
+      throw new HornetQNotConnectedException();
+   }
+
+   /* (non-Javadoc)
+    * @see java.lang.Object#toString()
+    */
+   @Override
+   public String toString()
+   {
+      return "HornetQXAResourceWrapper [serverLocator=" + serverLocator +
+         ", csf=" +
+         csf +
+         ", delegate=" +
+         delegate +
+         ", xaRecoveryConfigs=" +
+         Arrays.toString(xaRecoveryConfigs) +
+         ", instance=" +
+         System.identityHashCode(this) +
+         "]";
+   }
+
+   /**
+    * Close the connection
+    */
+   public void close()
+   {
+      ServerLocator oldServerLocator = null;
+      ClientSessionFactory oldCSF = null;
+      ClientSession oldDelegate = null;
+      synchronized (HornetQXAResourceWrapper.lock)
+      {
+         oldCSF = csf;
+         csf = null;
+         oldDelegate = delegate;
+         delegate = null;
+         oldServerLocator = serverLocator;
+         serverLocator = null;
+      }
+
+      if (oldDelegate != null)
+      {
+         try
+         {
+            oldDelegate.close();
+         }
+         catch (Throwable ignorable)
+         {
+            HornetQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
+         }
+      }
+
+      if (oldCSF != null)
+      {
+         try
+         {
+            oldCSF.close();
+         }
+         catch (Throwable ignorable)
+         {
+            HornetQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
+         }
+      }
+
+      if (oldServerLocator != null)
+      {
+         try
+         {
+            oldServerLocator.close();
+         }
+         catch (Throwable ignorable)
+         {
+            HornetQJMSServerLogger.LOGGER.debug(ignorable.getMessage(), ignorable);
+         }
+      }
+   }
+
+   /**
+    * Check whether an XAException is fatal. If it is an RM problem
+    * we close the connection so the next call will reconnect.
+    *
+    * @param e the xa exception
+    * @return never
+    * @throws XAException always
+    */
+   protected XAException check(final XAException e) throws XAException
+   {
+      HornetQJMSServerLogger.LOGGER.xaRecoveryError(e);
+
+
+      // If any exception happened, we close the connection so we may start fresh
+      close();
+      throw e;
+   }
+
+   @Override
+   protected void finalize() throws Throwable
+   {
+      close();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/RecoveryDiscovery.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/RecoveryDiscovery.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/RecoveryDiscovery.java
new file mode 100644
index 0000000..4291a53
--- /dev/null
+++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/RecoveryDiscovery.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.server.recovery;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQExceptionType;
+import org.apache.activemq6.api.core.Pair;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.api.core.client.ClusterTopologyListener;
+import org.apache.activemq6.api.core.client.ServerLocator;
+import org.apache.activemq6.api.core.client.SessionFailureListener;
+import org.apache.activemq6.api.core.client.TopologyMember;
+import org.apache.activemq6.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq6.jms.server.HornetQJMSServerLogger;
+
+/**
+ * <p>This class will have a simple Connection Factory and will listen
+ * for topology updates. </p>
+ * <p>This Discovery is instantiated by {@link HornetQRecoveryRegistry}
+ *
+ * @author clebertsuconic
+ */
+public class RecoveryDiscovery implements SessionFailureListener
+{
+
+   private ServerLocator locator;
+   private ClientSessionFactoryInternal sessionFactory;
+   private final XARecoveryConfig config;
+   private final AtomicInteger usage = new AtomicInteger(0);
+   private boolean started = false;
+
+
+   public RecoveryDiscovery(XARecoveryConfig config)
+   {
+      this.config = config;
+   }
+
+   public synchronized void start(boolean retry)
+   {
+      if (!started)
+      {
+         HornetQJMSServerLogger.LOGGER.debug("Starting RecoveryDiscovery on " + config);
+         started = true;
+
+         locator = config.createServerLocator();
+         locator.disableFinalizeCheck();
+         locator.addClusterTopologyListener(new InternalListener(config));
+         try
+         {
+            sessionFactory = (ClientSessionFactoryInternal) locator.createSessionFactory();
+            // We are using the SessionFactoryInternal here directly as we don't have information to connect with an user and password
+            // on the session as all we want here is to get the topology
+            // in case of failure we will retry
+            sessionFactory.addFailureListener(this);
+
+            HornetQJMSServerLogger.LOGGER.debug("RecoveryDiscovery started fine on " + config);
+         }
+         catch (Exception startupError)
+         {
+            if (!retry)
+            {
+               HornetQJMSServerLogger.LOGGER.xaRecoveryStartError(config);
+            }
+            stop();
+            HornetQRecoveryRegistry.getInstance().failedDiscovery(this);
+         }
+
+      }
+   }
+
+   public synchronized void stop()
+   {
+      internalStop();
+   }
+
+   /**
+    * we may have several connection factories referencing the same connection recovery entry.
+    * Because of that we need to make a count of the number of the instances that are referencing it,
+    * so we will remove it as soon as we are done
+    */
+   public int incrementUsage()
+   {
+      return usage.decrementAndGet();
+   }
+
+   public int decrementUsage()
+   {
+      return usage.incrementAndGet();
+   }
+
+
+   @Override
+   protected void finalize()
+   {
+      // I don't think it's a good thing to synchronize a method on a finalize,
+      // hence the internalStop (no sync) call here
+      internalStop();
+   }
+
+   protected void internalStop()
+   {
+      if (started)
+      {
+         started = false;
+         try
+         {
+            if (sessionFactory != null)
+            {
+               sessionFactory.close();
+            }
+         }
+         catch (Exception ignored)
+         {
+            HornetQJMSServerLogger.LOGGER.debug(ignored, ignored);
+         }
+
+         try
+         {
+            locator.close();
+         }
+         catch (Exception ignored)
+         {
+            HornetQJMSServerLogger.LOGGER.debug(ignored, ignored);
+         }
+
+         sessionFactory = null;
+         locator = null;
+      }
+   }
+
+
+   static final class InternalListener implements ClusterTopologyListener
+   {
+      private final XARecoveryConfig config;
+
+      public InternalListener(final XARecoveryConfig config)
+      {
+         this.config = config;
+      }
+
+      @Override
+      public void nodeUP(TopologyMember topologyMember, boolean last)
+      {
+         // There is a case where the backup announce itself,
+         // we need to ignore a case where getLive is null
+         if (topologyMember.getLive() != null)
+         {
+            Pair<TransportConfiguration, TransportConfiguration> connector =
+               new Pair<TransportConfiguration, TransportConfiguration>(topologyMember.getLive(),
+                                                                        topologyMember.getBackup());
+            HornetQRecoveryRegistry.getInstance().nodeUp(topologyMember.getNodeId(), connector,
+                                                         config.getUsername(), config.getPassword());
+         }
+      }
+
+      @Override
+      public void nodeDown(long eventUID, String nodeID)
+      {
+         // I'm not putting any node down, since it may have previous transactions hanging, however at some point we may
+         //change it have some sort of timeout for removal
+      }
+
+   }
+
+
+   @Override
+   public void connectionFailed(HornetQException exception, boolean failedOver)
+   {
+      if (exception.getType() == HornetQExceptionType.DISCONNECTED)
+      {
+         HornetQJMSServerLogger.LOGGER.warn("being disconnected for server shutdown", exception);
+      }
+      else
+      {
+         HornetQJMSServerLogger.LOGGER.warn("Notified of connection failure in xa discovery, we will retry on the next recovery",
+                                            exception);
+      }
+      internalStop();
+      HornetQRecoveryRegistry.getInstance().failedDiscovery(this);
+   }
+
+   @Override
+   public void connectionFailed(final HornetQException me, boolean failedOver, String scaleDownTargetNodeID)
+   {
+      connectionFailed(me, failedOver);
+   }
+
+   @Override
+   public void beforeReconnect(HornetQException exception)
+   {
+   }
+
+   /* (non-Javadoc)
+    * @see java.lang.Object#toString()
+    */
+   @Override
+   public String toString()
+   {
+      return "RecoveryDiscovery [config=" + config + ", started=" + started + "]";
+   }
+
+   @Override
+   public int hashCode()
+   {
+      return config.hashCode();
+   }
+
+   @Override
+   public boolean equals(Object o)
+   {
+      if (o == null || (!(o instanceof RecoveryDiscovery)))
+      {
+         return false;
+      }
+      RecoveryDiscovery discovery = (RecoveryDiscovery) o;
+
+      return config.equals(discovery.config);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/XARecoveryConfig.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/XARecoveryConfig.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/XARecoveryConfig.java
new file mode 100644
index 0000000..7319f2b
--- /dev/null
+++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/XARecoveryConfig.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.server.recovery;
+
+import java.util.Arrays;
+
+import org.apache.activemq6.api.core.DiscoveryGroupConfiguration;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.api.core.client.HornetQClient;
+import org.apache.activemq6.api.core.client.ServerLocator;
+import org.apache.activemq6.jms.client.HornetQConnectionFactory;
+
+/**
+ *
+ * This represents the configuration of a single connection factory.
+ *
+ * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
+ * @author Clebert Suconic
+ *
+ * A wrapper around info needed for the xa recovery resource
+ *         Date: 3/23/11
+ *         Time: 10:15 AM
+ */
+public class XARecoveryConfig
+{
+
+   private final boolean ha;
+   private final TransportConfiguration[] transportConfiguration;
+   private final DiscoveryGroupConfiguration discoveryConfiguration;
+   private final String username;
+   private final String password;
+
+   public static XARecoveryConfig newConfig(HornetQConnectionFactory factory,
+                                            String userName,
+                                            String password)
+   {
+      if (factory.getServerLocator().getDiscoveryGroupConfiguration() != null)
+      {
+         return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password);
+      }
+      else
+      {
+         return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password);
+      }
+
+   }
+
+   public XARecoveryConfig(final boolean ha, final TransportConfiguration[] transportConfiguration, final String username, final String password)
+   {
+      this.transportConfiguration = transportConfiguration;
+      this.discoveryConfiguration = null;
+      this.username = username;
+      this.password = password;
+      this.ha = ha;
+   }
+
+   public XARecoveryConfig(final boolean ha, final DiscoveryGroupConfiguration discoveryConfiguration, final String username, final String password)
+   {
+      this.discoveryConfiguration = discoveryConfiguration;
+      this.transportConfiguration = null;
+      this.username = username;
+      this.password = password;
+      this.ha = ha;
+   }
+
+   public boolean isHA()
+   {
+      return ha;
+   }
+
+   public DiscoveryGroupConfiguration getDiscoveryConfiguration()
+   {
+      return discoveryConfiguration;
+   }
+
+   public TransportConfiguration[] getTransportConfig()
+   {
+      return transportConfiguration;
+   }
+
+   public String getUsername()
+   {
+      return username;
+   }
+
+   public String getPassword()
+   {
+      return password;
+   }
+
+
+   /**
+    * Create a serverLocator using the configuration
+    * @return locator
+    */
+   public ServerLocator createServerLocator()
+   {
+      if (getDiscoveryConfiguration() != null)
+      {
+         return HornetQClient.createServerLocator(isHA(), getDiscoveryConfiguration());
+      }
+      else
+      {
+         return HornetQClient.createServerLocator(isHA(), getTransportConfig());
+      }
+
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((discoveryConfiguration == null) ? 0 : discoveryConfiguration.hashCode());
+      result = prime * result + Arrays.hashCode(transportConfiguration);
+      return result;
+   }
+
+   /*
+    * We don't use username and password on purpose.
+    * Just having the connector is enough, as we don't want to duplicate resources just because of usernames
+    */
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+         return true;
+      if (obj == null)
+         return false;
+      if (getClass() != obj.getClass())
+         return false;
+      XARecoveryConfig other = (XARecoveryConfig)obj;
+      if (discoveryConfiguration == null)
+      {
+         if (other.discoveryConfiguration != null)
+            return false;
+      }
+      else if (!discoveryConfiguration.equals(other.discoveryConfiguration))
+         return false;
+      if (!Arrays.equals(transportConfiguration, other.transportConfiguration))
+         return false;
+      return true;
+   }
+
+   /* (non-Javadoc)
+    * @see java.lang.Object#toString()
+    */
+   @Override
+   public String toString()
+   {
+      return "XARecoveryConfig [transportConfiguration = " + Arrays.toString(transportConfiguration) +
+             ", discoveryConfiguration = " + discoveryConfiguration +
+             ", username=" +
+             username +
+             ", password=****]";
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/package-info.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/package-info.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/package-info.java
new file mode 100644
index 0000000..efcf98a
--- /dev/null
+++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/server/recovery/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+/**
+ * This package is used to locate resources and connectors along the cluster set
+ * I - JCA Connection Factories or InBound MDBs will call HornetQRegistryBase::register(XARecoveryConfig)
+ * II - For each XARecoveryConfig the RegistryBase will instantiate a ResourceDiscoveryUnit which will
+ *      connect using that configuration and inform the Registry of any topology members
+ * III - For each topology member found on the DiscoveryUnits, the RegistryBase will registry a HornetQResourceRecovery
+ *       that will exist per server
+  */
+package org.apache.activemq6.jms.server.recovery;
+

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/transaction/JMSTransactionDetail.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/transaction/JMSTransactionDetail.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/transaction/JMSTransactionDetail.java
new file mode 100644
index 0000000..d3ef9b1
--- /dev/null
+++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/transaction/JMSTransactionDetail.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.transaction;
+
+import java.util.Map;
+
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq6.core.server.ServerMessage;
+import org.apache.activemq6.core.transaction.Transaction;
+import org.apache.activemq6.core.transaction.TransactionDetail;
+import org.apache.activemq6.jms.client.HornetQBytesMessage;
+import org.apache.activemq6.jms.client.HornetQMapMessage;
+import org.apache.activemq6.jms.client.HornetQMessage;
+import org.apache.activemq6.jms.client.HornetQObjectMessage;
+import org.apache.activemq6.jms.client.HornetQStreamMessage;
+import org.apache.activemq6.jms.client.HornetQTextMessage;
+
+/**
+ * A JMSTransactionDetail
+ *
+ * @author <a href="tm.igarashi@gmail.com">Tomohisa Igarashi</a>
+ *
+ *
+ */
+public class JMSTransactionDetail extends TransactionDetail
+{
+   public JMSTransactionDetail(Xid xid, Transaction tx, Long creation) throws Exception
+   {
+      super(xid,tx,creation);
+   }
+
+   @Override
+   public String decodeMessageType(ServerMessage msg)
+   {
+      int type = msg.getType();
+      switch (type)
+      {
+         case HornetQMessage.TYPE: // 0
+            return "Default";
+         case HornetQObjectMessage.TYPE: // 2
+            return "ObjectMessage";
+         case HornetQTextMessage.TYPE: // 3
+            return "TextMessage";
+         case HornetQBytesMessage.TYPE: // 4
+            return "ByteMessage";
+         case HornetQMapMessage.TYPE: // 5
+            return "MapMessage";
+         case HornetQStreamMessage.TYPE: // 6
+            return "StreamMessage";
+         default:
+            return "(Unknown Type)";
+      }
+   }
+
+   @Override
+   public Map<String, Object> decodeMessageProperties(ServerMessage msg)
+   {
+      try
+      {
+         return HornetQMessage.coreMaptoJMSMap(msg.toMap());
+      }
+      catch (Throwable t)
+      {
+         return null;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/resources/schema/hornetq-jms.xsd
----------------------------------------------------------------------
diff --git a/activemq6-jms-server/src/main/resources/schema/hornetq-jms.xsd b/activemq6-jms-server/src/main/resources/schema/hornetq-jms.xsd
new file mode 100644
index 0000000..9add1f9
--- /dev/null
+++ b/activemq6-jms-server/src/main/resources/schema/hornetq-jms.xsd
@@ -0,0 +1,267 @@
+<?xml version='1.0' encoding='UTF-8'?>
+
+<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+            targetNamespace="urn:hornetq"
+            xmlns="urn:hornetq"
+            xmlns:hq="urn:org.hornetq"
+            elementFormDefault="qualified"
+            attributeFormDefault="unqualified"
+            version="1.0">
+
+   <xsd:element name="configuration" hq:schema="hornetq-jms-configuration">
+   	<xsd:complexType>
+   	  <xsd:sequence>
+            <xsd:element name="jmx-domain" type="xsd:string" default="org.hornetq"
+                         minOccurs="0" maxOccurs="1"/>
+   	    <xsd:element ref="connection-factory" maxOccurs="unbounded" minOccurs="0"/>
+   	    <xsd:choice maxOccurs="unbounded" minOccurs="0">
+   	      <xsd:element ref="queue" maxOccurs="1" minOccurs="1"/>
+              <xsd:element ref="topic" maxOccurs="1" minOccurs="1"/>
+            </xsd:choice>
+          </xsd:sequence>
+   	</xsd:complexType>
+   </xsd:element>
+
+   <xsd:element name="connection-factory">
+        <xsd:annotation hq:linkend="using-jms.server.configuration">
+          <xsd:documentation>a list of connection factories to create and add to
+          JNDI</xsd:documentation>
+        </xsd:annotation>
+   	<xsd:complexType>
+          <xsd:all>
+            <xsd:element name="xa" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
+              <xsd:annotation hq:linkend="using-jms.configure.factory.types"
+                              hq:id="configuration.connection-factory.signature.xa">
+                <xsd:documentation>Whether this is an XA connection factory</xsd:documentation>
+              </xsd:annotation>
+            </xsd:element>
+            <xsd:element name="discovery-group-ref" type="discovery-group-refType" maxOccurs="1" minOccurs="0">
+            </xsd:element>
+
+            <xsd:element name="connectors" maxOccurs="1" minOccurs="0">
+              <xsd:annotation hq:linkend="clusters">
+                <xsd:documentation>A sequence of connectors used by the connection factory
+                </xsd:documentation>
+              </xsd:annotation>
+            <xsd:complexType>
+                <xsd:sequence>
+                  <xsd:element name="connector-ref" maxOccurs="unbounded" minOccurs="1">
+                    <xsd:annotation>
+                      <xsd:documentation>A connector reference
+                      </xsd:documentation>
+                    </xsd:annotation>
+                    <xsd:complexType>
+                      <xsd:attribute name="connector-name" type="xsd:string" use="required">
+                        <xsd:annotation>
+                          <xsd:documentation>Name of the connector to connect to the live server
+                          </xsd:documentation>
+                        </xsd:annotation>
+                      </xsd:attribute>
+                    </xsd:complexType>
+                  </xsd:element>
+                </xsd:sequence>
+              </xsd:complexType>
+              </xsd:element>
+
+            <xsd:element name="entries" maxOccurs="1" minOccurs="0">
+              <xsd:complexType>
+                <xsd:sequence>
+                  <xsd:element name="entry" type="entryType" maxOccurs="unbounded" minOccurs="1">
+                  </xsd:element>
+                </xsd:sequence>
+              </xsd:complexType>
+            </xsd:element>
+
+            <xsd:element name="client-failure-check-period" type="xsd:long" default="30000"
+                         maxOccurs="1" minOccurs="0">
+              <xsd:annotation hq:id="configuration.connection-factory.client-failure-check-period"
+                              hq:linkend="dead.connections" hq:default="(ms)">
+                <xsd:documentation>
+                  the period (in ms) after which the client will consider the connection failed
+                  after not receiving packets from the server. -1 disables this setting.
+                </xsd:documentation>
+              </xsd:annotation>
+            </xsd:element>
+
+            <xsd:element name="connection-ttl" type="xsd:long" maxOccurs="1" minOccurs="0">
+              <xsd:annotation hq:id="configuration.connection-factory.connection-ttl"
+                              hq:linkend="dead.connections">
+                <xsd:documentation>the time to live (in ms) for connections
+                </xsd:documentation>
+              </xsd:annotation>
+            </xsd:element>
+
+            <xsd:element name="call-timeout" type="xsd:long" default="30000"
+                         maxOccurs="1" minOccurs="0">
+              <xsd:annotation hq:id="configuration.connection-factory.call-timeout">
+                <xsd:documentation>
+                  the timeout (in ms) for remote calls
+                </xsd:documentation>
+              </xsd:annotation>
+            </xsd:element>
+   			 <xsd:element name="call-failover-timeout" type="xsd:long"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+            <xsd:element name="consumer-window-size" type="xsd:int"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+            <xsd:element name="consumer-max-rate" type="xsd:int"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+            <xsd:element name="confirmation-window-size" type="xsd:int"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+            <xsd:element name="producer-window-size" type="xsd:int"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+            <xsd:element name="producer-max-rate" type="xsd:int"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+            <xsd:element name="cache-large-message-client" type="xsd:boolean"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+            <xsd:element name="min-large-message-size" type="xsd:long"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+            <xsd:element name="compress-large-messages" type="xsd:boolean"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+
+            <xsd:element name="client-id" type="xsd:string" maxOccurs="1" minOccurs="0">
+              <xsd:annotation hq:id="configuration.connection-factory.client-id"
+                              hq:linkend="using-jms.clientid">
+                <xsd:documentation>
+                  the pre-configured client ID for the connection factory
+                </xsd:documentation>
+              </xsd:annotation>
+            </xsd:element>
+
+            <xsd:element name="dups-ok-batch-size" type="xsd:int"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+            <xsd:element name="transaction-batch-size" type="xsd:int"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+         <xsd:element name="block-on-acknowledge" type="xsd:boolean" default="false"
+                      maxOccurs="1" minOccurs="0">
+           <xsd:annotation hq:linkend="send-guarantees.nontrans.acks"
+                           hq:id="configuration.connection-factory.block-on-acknowledge">
+             <xsd:documentation>
+               whether or not messages are acknowledged synchronously
+             </xsd:documentation>
+           </xsd:annotation>
+         </xsd:element>
+         <xsd:element name="block-on-non-durable-send" type="xsd:boolean" default="false"
+                      maxOccurs="1" minOccurs="0">
+           <xsd:annotation hq:id="configuration.connection-factory.block-on-non-durable-send"
+                           hq:linkend="non-transactional-sends">
+             <xsd:documentation>
+               whether or not non-durable messages are sent synchronously
+             </xsd:documentation>
+           </xsd:annotation>
+         </xsd:element>
+            <xsd:element name="block-on-durable-send" type="xsd:boolean" default="true"
+                         maxOccurs="1" minOccurs="0">
+              <xsd:annotation hq:id="configuration.connection-factory.block-on-durable-send"
+                              hq:linkend="non-transactional-sends">
+                <xsd:documentation>
+                  whether or not durable messages are sent synchronously
+                </xsd:documentation>
+              </xsd:annotation>
+            </xsd:element>
+            <xsd:element name="auto-group" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
+              <xsd:annotation hq:id="configuration.connection-factory.auto-group"
+                              hq:linkend="message-grouping.jmsconfigure">
+                <xsd:documentation>whether or not message grouping is automatically used
+                </xsd:documentation>
+              </xsd:annotation>
+            </xsd:element>
+            <xsd:element name="pre-acknowledge" type="xsd:boolean"
+   				maxOccurs="1" minOccurs="0">
+   			</xsd:element>
+            <xsd:element name="retry-interval" type="xsd:long"
+   				maxOccurs="1" minOccurs="0">
+   			</xsd:element>
+   			<xsd:element name="retry-interval-multiplier" type="xsd:float"
+   				maxOccurs="1" minOccurs="0">
+   			</xsd:element>
+   			<xsd:element name="max-retry-interval" type="xsd:long"
+   				maxOccurs="1" minOccurs="0">
+   			</xsd:element>
+            <xsd:element name="reconnect-attempts" type="xsd:int"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+            <xsd:element name="failover-on-initial-connection" type="xsd:boolean"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+            <xsd:element name="failover-on-server-shutdown" type="xsd:boolean"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+            <xsd:element name="connection-load-balancing-policy-class-name" type="xsd:string"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+            <xsd:element name="use-global-pools" type="xsd:boolean"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+            <xsd:element name="scheduled-thread-pool-max-size" type="xsd:int"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+            <xsd:element name="thread-pool-max-size" type="xsd:int"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+            <xsd:element name="group-id" type="xsd:string"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+            <xsd:element name="ha" type="xsd:boolean"
+                maxOccurs="1" minOccurs="0">
+            </xsd:element>
+   		</xsd:all>
+   		<xsd:attribute name="name" type="xsd:string"></xsd:attribute>
+   		<xsd:attribute name="signature" type="xsd:string">
+                  <xsd:annotation hq:id="configuration.connection-factory.signature"
+                                  hq:linkend="using-jms.configure.factory.types"
+                                  hq:default="generic"> <!-- XXX -->
+                    <xsd:documentation>Type of connection factory</xsd:documentation>
+                  </xsd:annotation>
+                </xsd:attribute>
+   	</xsd:complexType>
+   </xsd:element>
+
+    <xsd:complexType name="entryType">
+    	<xsd:attribute name="name" type="xsd:string" use="required"></xsd:attribute>
+    </xsd:complexType>
+
+    <xsd:complexType name="discovery-group-refType">
+    	<xsd:attribute name="discovery-group-name" type="xsd:string" use="required">
+          <xsd:annotation>
+          <xsd:documentation>
+            Name of discovery group used by this connection factory
+          </xsd:documentation>
+          </xsd:annotation>
+        </xsd:attribute>
+    </xsd:complexType>
+
+    <xsd:element name="queue" type="queueType"></xsd:element>
+
+    <xsd:element name="topic" type="topicType"></xsd:element>
+
+   <xsd:complexType name="queueType">
+    	<xsd:sequence>
+    		<xsd:element name="entry" type="entryType" maxOccurs="unbounded" minOccurs="1"></xsd:element>
+          <xsd:element name="selector" maxOccurs="1" minOccurs="0">
+                <xsd:complexType>
+                   <xsd:attribute name="string" type="xsd:string" use="required"></xsd:attribute>
+                </xsd:complexType>
+            </xsd:element>
+            <xsd:element name="durable" type="xsd:boolean" maxOccurs="1" minOccurs="0"></xsd:element>
+    	</xsd:sequence>
+    	<xsd:attribute name="name" type="xsd:string" use="required"></xsd:attribute>
+    </xsd:complexType>
+
+    <xsd:complexType name="topicType">
+    	<xsd:sequence>
+    		<xsd:element name="entry" type="entryType" maxOccurs="unbounded" minOccurs="1"></xsd:element>
+    	</xsd:sequence>
+    	<xsd:attribute name="name" type="xsd:string" use="required"></xsd:attribute>
+    </xsd:complexType>
+</xsd:schema>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/pom.xml
----------------------------------------------------------------------
diff --git a/activemq6-journal/pom.xml b/activemq6-journal/pom.xml
new file mode 100644
index 0000000..efd1912
--- /dev/null
+++ b/activemq6-journal/pom.xml
@@ -0,0 +1,92 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+   <modelVersion>4.0.0</modelVersion>
+
+   <parent>
+      <groupId>org.apache.activemq6</groupId>
+      <artifactId>activemq6-pom</artifactId>
+      <version>6.0.0-SNAPSHOT</version>
+   </parent>
+
+   <artifactId>activemq6-journal</artifactId>
+   <packaging>jar</packaging>
+   <name>ActiveMQ6 Journal</name>
+
+   <properties>
+      <hornetq.basedir>${project.basedir}/..</hornetq.basedir>
+   </properties>
+
+   <dependencies>
+      <dependency>
+         <groupId>org.jboss.logging</groupId>
+         <artifactId>jboss-logging-processor</artifactId>
+      </dependency>
+
+      <!--
+          JBoss Logging
+      -->
+      <dependency>
+         <groupId>org.jboss.logging</groupId>
+         <artifactId>jboss-logging</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.jboss.logmanager</groupId>
+         <artifactId>jboss-logmanager</artifactId>
+         <scope>test</scope>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq6</groupId>
+         <artifactId>activemq6-commons</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq6</groupId>
+         <artifactId>activemq6-native</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <!-- needed to compile the tests -->
+      <dependency>
+         <groupId>junit</groupId>
+         <artifactId>junit</artifactId>
+         <scope>test</scope>
+      </dependency>
+   </dependencies>
+
+   <profiles>
+      <profile>
+         <id>release</id>
+         <build>
+            <plugins>
+               <plugin>
+                  <groupId>org.apache.maven.plugins</groupId>
+                  <artifactId>maven-javadoc-plugin</artifactId>
+                  <version>2.9</version>
+                  <configuration>
+                     <doclet>org.jboss.apiviz.APIviz</doclet>
+                     <docletArtifact>
+                        <groupId>org.jboss.apiviz</groupId>
+                        <artifactId>apiviz</artifactId>
+                        <version>1.3.2.GA</version>
+                     </docletArtifact>
+                     <useStandardDocletOptions>true</useStandardDocletOptions>
+                     <minmemory>128m</minmemory>
+                     <maxmemory>512m</maxmemory>
+                     <quiet>false</quiet>
+                     <aggregate>true</aggregate>
+                     <excludePackageNames>org.hornetq.core:org.hornetq.utils</excludePackageNames>
+                  </configuration>
+                  <executions>
+                     <execution>
+                        <id>javadocs</id>
+                        <goals>
+                           <goal>jar</goal>
+                        </goals>
+                     </execution>
+                  </executions>
+               </plugin>
+            </plugins>
+         </build>
+      </profile>
+   </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/AIOCallback.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/AIOCallback.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/AIOCallback.java
new file mode 100644
index 0000000..d92fedb
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/AIOCallback.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.asyncio;
+
+/**
+ * The interface used for AIO Callbacks.
+ * @author clebert.suconic@jboss.com
+ *
+ */
+public interface AIOCallback
+{
+   /**
+    * Method for sync notifications. When this callback method is called, there is a guarantee the data is written on the disk.
+    * <br><b>Note:</b><i>Leave this method as soon as possible, or you would be blocking the whole notification thread</i> */
+   void done();
+
+   /**
+    * Method for error notifications.
+    * Observation: The whole file will be probably failing if this happens. Like, if you delete the file, you will start to get errors for these operations*/
+   void onError(int errorCode, String errorMessage);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/AsynchronousFile.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/AsynchronousFile.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/AsynchronousFile.java
new file mode 100644
index 0000000..fad8661
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/AsynchronousFile.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.asyncio;
+
+import java.nio.ByteBuffer;
+
+import org.apache.activemq6.api.core.HornetQException;
+
+/**
+ *
+ * @author clebert.suconic@jboss.com
+ *
+ */
+public interface AsynchronousFile
+{
+   void close() throws InterruptedException, HornetQException;
+
+   /**
+    *
+    * Note: If you are using a native Linux implementation, maxIO can't be higher than what's defined on /proc/sys/fs/aio-max-nr, or you would get an error
+    * @param fileName
+    * @param maxIO The number of max concurrent asynchronous IO operations. It has to be balanced between the size of your writes and the capacity of your disk.
+    * @throws HornetQException
+    */
+   void open(String fileName, int maxIO) throws HornetQException;
+
+   /**
+    * Warning: This function will perform a synchronous IO, probably translating to a fstat call
+    * @throws HornetQException
+    * */
+   long size() throws HornetQException;
+
+   /** Any error will be reported on the callback interface */
+   void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback);
+
+   /**
+    * Performs an internal direct write.
+    * @throws HornetQException
+    */
+   void writeInternal(long positionToWrite, long size, ByteBuffer bytes) throws HornetQException;
+
+   void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback) throws HornetQException;
+
+   void fill(long position, int blocks, long size, byte fillChar) throws HornetQException;
+
+   void setBufferCallback(BufferCallback callback);
+
+   int getBlockSize();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/BufferCallback.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/BufferCallback.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/BufferCallback.java
new file mode 100644
index 0000000..189fb40
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/BufferCallback.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.asyncio;
+
+import java.nio.ByteBuffer;
+
+/**
+ *
+ * Used to receive a notification on completed buffers used by the AIO layer.
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface BufferCallback
+{
+   void bufferDone(ByteBuffer buffer);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/IOExceptionListener.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/IOExceptionListener.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/IOExceptionListener.java
new file mode 100644
index 0000000..892c1d0
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/asyncio/IOExceptionListener.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.asyncio;
+
+/**
+ * A IOExceptionListener
+ *
+ * @author clebert
+ *
+ *
+ */
+public interface IOExceptionListener
+{
+   void onIOException(Exception exception, String message);
+}


Mime
View raw message