activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [40/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master
Date Tue, 11 Nov 2014 11:01:10 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/reader/TextMessageUtil.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/reader/TextMessageUtil.java b/hornetq-core-client/src/main/java/org/hornetq/reader/TextMessageUtil.java
new file mode 100644
index 0000000..9601f5e
--- /dev/null
+++ b/hornetq-core-client/src/main/java/org/hornetq/reader/TextMessageUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.reader;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class TextMessageUtil extends MessageUtil
+{
+
+   /**
+    * Utility method to set the Text message on a message body
+    */
+   public static void writeBodyText(Message message, SimpleString text)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      buff.clear();
+      buff.writeNullableSimpleString(text);
+   }
+
+   /**
+    * Utility method to set the Text message on a message body
+    */
+   public static SimpleString readBodyText(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      buff.resetReaderIndex();
+      return buff.readNullableSimpleString();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/reader/package-info.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/reader/package-info.java b/hornetq-core-client/src/main/java/org/hornetq/reader/package-info.java
new file mode 100644
index 0000000..5e75719
--- /dev/null
+++ b/hornetq-core-client/src/main/java/org/hornetq/reader/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+/**
+ * Provides reading methods for JMS like objects.
+ * This isolates the logic from the JMS client case you need this kind of functionality from core.
+ * This is also used on conversions between protocols such as AMQP and Core where
+ * this is done at the server's
+ * @author Clebert Suconic
+ */
+package org.hornetq.reader;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/AbstractRemotingConnection.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/AbstractRemotingConnection.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/AbstractRemotingConnection.java
new file mode 100644
index 0000000..f5a910e
--- /dev/null
+++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/AbstractRemotingConnection.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.spi.core.protocol;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.HornetQInterruptedException;
+import org.hornetq.core.client.HornetQClientLogger;
+import org.hornetq.core.client.HornetQClientMessageBundle;
+import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public abstract class AbstractRemotingConnection implements RemotingConnection
+{
+   protected final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
+   protected final List<CloseListener> closeListeners = new CopyOnWriteArrayList<CloseListener>();
+   protected final Connection transportConnection;
+   protected final Executor executor;
+   protected final long creationTime;
+   protected volatile boolean dataReceived;
+
+   public AbstractRemotingConnection(final Connection transportConnection, final Executor executor)
+   {
+      this.transportConnection = transportConnection;
+      this.executor = executor;
+      this.creationTime = System.currentTimeMillis();
+   }
+
+   public List<FailureListener> getFailureListeners()
+   {
+      return new ArrayList<FailureListener>(failureListeners);
+   }
+
+   protected void callFailureListeners(final HornetQException me, String scaleDownTargetNodeID)
+   {
+      final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
+
+      for (final FailureListener listener : listenersClone)
+      {
+         try
+         {
+            listener.connectionFailed(me, false, scaleDownTargetNodeID);
+         }
+         catch (HornetQInterruptedException interrupted)
+         {
+            // this is an expected behaviour.. no warn or error here
+            HornetQClientLogger.LOGGER.debug("thread interrupted", interrupted);
+         }
+         catch (final Throwable t)
+         {
+            // Failure of one listener to execute shouldn't prevent others
+            // from
+            // executing
+            HornetQClientLogger.LOGGER.errorCallingFailureListener(t);
+         }
+      }
+   }
+
+
+   protected void callClosingListeners()
+   {
+      final List<CloseListener> listenersClone = new ArrayList<CloseListener>(closeListeners);
+
+      for (final CloseListener listener : listenersClone)
+      {
+         try
+         {
+            listener.connectionClosed();
+         }
+         catch (final Throwable t)
+         {
+            // Failure of one listener to execute shouldn't prevent others
+            // from
+            // executing
+            HornetQClientLogger.LOGGER.errorCallingFailureListener(t);
+         }
+      }
+   }
+
+   public void setFailureListeners(final List<FailureListener> listeners)
+   {
+      failureListeners.clear();
+
+      failureListeners.addAll(listeners);
+   }
+
+   public Object getID()
+   {
+      return transportConnection.getID();
+   }
+
+   public String getRemoteAddress()
+   {
+      return transportConnection.getRemoteAddress();
+   }
+
+   public void addFailureListener(final FailureListener listener)
+   {
+      if (listener == null)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.failListenerCannotBeNull();
+      }
+      failureListeners.add(listener);
+   }
+
+   public boolean removeFailureListener(final FailureListener listener)
+   {
+      if (listener == null)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.failListenerCannotBeNull();
+      }
+
+      return failureListeners.remove(listener);
+   }
+
+   public void addCloseListener(final CloseListener listener)
+   {
+      if (listener == null)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.closeListenerCannotBeNull();
+      }
+
+      closeListeners.add(listener);
+   }
+
+   public boolean removeCloseListener(final CloseListener listener)
+   {
+      if (listener == null)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.closeListenerCannotBeNull();
+      }
+
+      return closeListeners.remove(listener);
+   }
+
+   public List<CloseListener> removeCloseListeners()
+   {
+      List<CloseListener> ret = new ArrayList<CloseListener>(closeListeners);
+
+      closeListeners.clear();
+
+      return ret;
+   }
+
+   public List<FailureListener> removeFailureListeners()
+   {
+      List<FailureListener> ret = getFailureListeners();
+
+      failureListeners.clear();
+
+      return ret;
+   }
+
+   public void setCloseListeners(List<CloseListener> listeners)
+   {
+      closeListeners.clear();
+
+      closeListeners.addAll(listeners);
+   }
+
+   public HornetQBuffer createBuffer(final int size)
+   {
+      return transportConnection.createBuffer(size);
+   }
+
+   public Connection getTransportConnection()
+   {
+      return transportConnection;
+   }
+
+   public long getCreationTime()
+   {
+      return creationTime;
+   }
+
+   public boolean checkDataReceived()
+   {
+      boolean res = dataReceived;
+
+      dataReceived = false;
+
+      return res;
+   }
+
+   /*
+    * This can be called concurrently by more than one thread so needs to be locked
+    */
+   public void fail(final HornetQException me)
+   {
+      fail(me, null);
+   }
+
+   public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+   {
+      dataReceived = true;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/ConnectionEntry.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/ConnectionEntry.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/ConnectionEntry.java
index 1d1219f..5b60aac 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/ConnectionEntry.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/ConnectionEntry.java
@@ -32,6 +32,11 @@ public class ConnectionEntry
 
    public final Executor connectionExecutor;
 
+   public Object getID()
+   {
+      return connection.getID();
+   }
+
    public ConnectionEntry(final RemotingConnection connection, final Executor connectionExecutor, final long lastCheck, final long ttl)
    {
       this.connection = connection;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java
index 7e4114f..932f771 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java
@@ -24,6 +24,10 @@ import org.hornetq.spi.core.remoting.Connection;
 /**
  * A RemotingConnection is a connection between a client and a server.
  *
+ *
+ * Perhaps a better name for this class now would be ProtocolConnection as this
+ * represents the link with the used protocol
+ *
  * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
  * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
  */

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/AbstractConnector.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/AbstractConnector.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/AbstractConnector.java
index db552a2..e56f1ac 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/AbstractConnector.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/AbstractConnector.java
@@ -14,8 +14,6 @@ package org.hornetq.spi.core.remoting;
 
 import java.util.Map;
 
-import org.hornetq.core.protocol.core.impl.HornetQClientProtocolManagerFactory;
-
 /**
  * Abstract connector
  *
@@ -25,16 +23,8 @@ public abstract class AbstractConnector implements Connector
 {
    protected final Map<String, Object> configuration;
 
-   private static final ClientProtocolManagerFactory protocolWrapperFactory = new HornetQClientProtocolManagerFactory();
-
    protected AbstractConnector(Map<String, Object> configuration)
    {
       this.configuration = configuration;
    }
-
-   public ClientProtocolManagerFactory getProtocolManagerFactory()
-   {
-      return protocolWrapperFactory;
-   }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManager.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManager.java
index dc614b4..8ae772b 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManager.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManager.java
@@ -16,8 +16,10 @@ package org.hornetq.spi.core.remoting;
 import java.util.List;
 import java.util.concurrent.locks.Lock;
 
+import io.netty.channel.ChannelPipeline;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 
 /**
@@ -29,12 +31,10 @@ public interface ClientProtocolManager
 
    /// Life Cycle Methods:
 
-   RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout,  List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors, ProtocolResponseHandler protocolResponseHandler);
+   RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors, TopologyResponseHandler topologyResponseHandler);
 
    RemotingConnection getCurrentConnection();
 
-   void setResponseHandler(ProtocolResponseHandler handler);
-
    Lock lockSessionCreation();
 
    boolean waitOnLatch(long milliseconds) throws InterruptedException;
@@ -49,11 +49,10 @@ public interface ClientProtocolManager
 
    /// Sending methods
 
+   void addChannelHandlers(ChannelPipeline pipeline);
 
    void sendSubscribeTopology(boolean isServer);
 
-   void shakeHands();
-
    void ping(long connectionTTL);
 
    SessionContext createSessionContext(final String name,
@@ -66,7 +65,14 @@ public interface ClientProtocolManager
                                        int minLargeMessageSize,
                                        int confirmationWindowSize) throws HornetQException;
 
-   boolean cleanupBeforeFailover();
+   boolean cleanupBeforeFailover(HornetQException cause);
 
    boolean checkForFailover(String liveNodeID) throws HornetQException;
+
+   void setSessionFactory(ClientSessionFactory factory);
+
+   ClientSessionFactory getSessionFactory();
+
+   String getName();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManagerFactory.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManagerFactory.java
index 0c40e86..448f92d 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManagerFactory.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManagerFactory.java
@@ -12,12 +12,13 @@
  */
 package org.hornetq.spi.core.remoting;
 
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import java.io.Serializable;
 
 /**
  * @author Clebert Suconic
  */
-public interface ClientProtocolManagerFactory
+public interface ClientProtocolManagerFactory extends Serializable
 {
-   ClientProtocolManager newProtocolManager(ClientSessionFactoryInternal factoryInternal);
+
+   ClientProtocolManager newProtocolManager();
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connection.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connection.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connection.java
index 1df470b..69c338b 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connection.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connection.java
@@ -12,14 +12,17 @@
  */
 package org.hornetq.spi.core.remoting;
 
+import io.netty.channel.ChannelFutureListener;
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.security.HornetQPrincipal;
+import org.hornetq.spi.core.protocol.RemotingConnection;
 
 /**
  * The connection used by a channel to write data to.
  *
  * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author Clebert Suconic
  */
 public interface Connection
 {
@@ -31,6 +34,11 @@ public interface Connection
     */
    HornetQBuffer createBuffer(int size);
 
+
+   RemotingConnection getProtocolConnection();
+
+   void setProtocolConnection(RemotingConnection connection);
+
    /**
     * returns the unique id of this wire.
     *
@@ -48,12 +56,29 @@ public interface Connection
    void write(HornetQBuffer buffer, boolean flush, boolean batched);
 
    /**
+    * writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection.
+    *
+    * @param buffer the buffer to write
+    * @param flush  whether to flush the buffers onto the wire
+    * @param batched whether the packet is allowed to batched for better performance
+    */
+   void write(HornetQBuffer buffer, boolean flush, boolean batched, ChannelFutureListener futureListener);
+
+   /**
     * writes the buffer to the connection with no flushing or batching
     *
     * @param buffer the buffer to write
     */
    void write(HornetQBuffer buffer);
 
+
+   /**
+    * This should close the internal channel without calling any listeners.
+    * This is to avoid a situation where the broker is busy writing on an internal thread.
+    * This should close the socket releasing any pending threads.
+    */
+   void forceClose();
+
    /**
     * Closes the connection.
     */
@@ -77,9 +102,16 @@ public interface Connection
    /**
     * Generates a {@link TransportConfiguration} to be used to connect to the same target this is
     * connected to.
-    * @return TranportConfiguration
+    * @return TransportConfiguration
     */
    TransportConfiguration getConnectorConfig();
 
    HornetQPrincipal getDefaultHornetQPrincipal();
+
+   /**
+    * the InVM Connection has some special handling as it doesn't use Netty ProtocolChannel
+    * we will use this method Instead of using instanceof
+    * @return
+    */
+   boolean isUsingProtocolHandling();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connector.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connector.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connector.java
index 99dfa43..474f189 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connector.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connector.java
@@ -50,8 +50,6 @@ public interface Connector
     */
    Connection createConnection();
 
-   ClientProtocolManagerFactory getProtocolManagerFactory();
-
    /**
     * If the configuration is equivalent to this connector, which means
     * if the parameter configuration is used to create a connection to a target

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ConnectorFactory.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ConnectorFactory.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ConnectorFactory.java
index d77b107..5d0b458 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ConnectorFactory.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ConnectorFactory.java
@@ -17,6 +17,8 @@ import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.hornetq.api.core.TransportConfigurationHelper;
+
 /**
  * A ConnectorFactory is used by the client for creating connectors.
  * <p>
@@ -24,7 +26,7 @@ import java.util.concurrent.ScheduledExecutorService;
  *
  * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
  */
-public interface ConnectorFactory
+public interface ConnectorFactory extends TransportConfigurationHelper
 {
    /**
     * creates a new instance of a connector.
@@ -42,7 +44,8 @@ public interface ConnectorFactory
                              ConnectionLifeCycleListener listener,
                              Executor closeExecutor,
                              Executor threadPool,
-                             ScheduledExecutorService scheduledThreadPool);
+                             ScheduledExecutorService scheduledThreadPool,
+                             ClientProtocolManager protocolManager);
 
    /**
     * Returns the allowable properties for this connector.

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ConsumerContext.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ConsumerContext.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ConsumerContext.java
new file mode 100644
index 0000000..082bcd4
--- /dev/null
+++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ConsumerContext.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.spi.core.remoting;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public abstract class ConsumerContext
+{
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ProtocolResponseHandler.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ProtocolResponseHandler.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ProtocolResponseHandler.java
deleted file mode 100644
index e5a7c91..0000000
--- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ProtocolResponseHandler.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.spi.core.remoting;
-
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.spi.core.protocol.RemotingConnection;
-
-/**
- * @author Clebert Suconic
- */
-
-public interface ProtocolResponseHandler
-{
-   // This is sent when the server is telling the client the node is being disconnected
-   void nodeDisconnected(RemotingConnection conn, String nodeID, String scaleDownTargetNodeID);
-
-   void notifyNodeUp(long uniqueEventID,
-                     final String backupGroupName,
-                     final String scaleDownGroupName,
-                     final String nodeName,
-                     final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                     final boolean isLast);
-
-   // This is sent when any node on the cluster topology is going down
-   void notifyNodeDown(final long eventTime, final String nodeID);
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/SessionContext.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/SessionContext.java
index af6e2f0..87d97de 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/SessionContext.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/SessionContext.java
@@ -27,6 +27,7 @@ import org.hornetq.api.core.client.SendAcknowledgementHandler;
 import org.hornetq.core.client.impl.ClientConsumerInternal;
 import org.hornetq.core.client.impl.ClientLargeMessageInternal;
 import org.hornetq.core.client.impl.ClientMessageInternal;
+import org.hornetq.core.client.impl.ClientProducerCreditsImpl;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -41,8 +42,6 @@ public abstract class SessionContext
 {
    protected ClientSessionInternal session;
 
-   protected final String name;
-
    protected SendAcknowledgementHandler sendAckHandler;
 
    protected volatile RemotingConnection remotingConnection;
@@ -50,10 +49,9 @@ public abstract class SessionContext
    protected final IDGenerator idGenerator = new SimpleIDGenerator(0);
 
 
-   public SessionContext(final String name, RemotingConnection remotingConnection)
+   public SessionContext(RemotingConnection remotingConnection)
    {
       this.remotingConnection = remotingConnection;
-      this.name = name;
    }
 
 
@@ -88,7 +86,7 @@ public abstract class SessionContext
 
    public abstract boolean supportsLargeMessage();
 
-   protected void handleReceiveLargeMessage(long consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception
+   protected void handleReceiveLargeMessage(ConsumerContext consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception
    {
       ClientSessionInternal session = this.session;
       if (session != null)
@@ -97,7 +95,7 @@ public abstract class SessionContext
       }
    }
 
-   protected void handleReceiveMessage(final long consumerID, final ClientMessageInternal message) throws Exception
+   protected void handleReceiveMessage(ConsumerContext consumerID, final ClientMessageInternal message) throws Exception
    {
 
       ClientSessionInternal session = this.session;
@@ -107,7 +105,7 @@ public abstract class SessionContext
       }
    }
 
-   protected void handleReceiveContinuation(final long consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception
+   protected void handleReceiveContinuation(final ConsumerContext consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception
    {
       ClientSessionInternal session = this.session;
       if (session != null)
@@ -138,7 +136,7 @@ public abstract class SessionContext
 
    public abstract int getCreditsOnSendingFull(MessageInternal msgI);
 
-   public abstract void sendFullMessage(MessageInternal msgI, boolean sendBlocking, SendAcknowledgementHandler handler) throws HornetQException;
+   public abstract void sendFullMessage(MessageInternal msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws HornetQException;
 
    /**
     * it should return the number of credits (or bytes) used to send this packet
@@ -252,7 +250,7 @@ public abstract class SessionContext
    /**
     * Interrupt and return any blocked calls
     */
-   public abstract void returnBlocking();
+   public abstract void returnBlocking(HornetQException cause);
 
    /**
     * it will lock the communication channel of the session avoiding anything to come while failover is happening.
@@ -266,4 +264,5 @@ public abstract class SessionContext
    public abstract void cleanup();
 
 
+   public abstract void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits);
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/TopologyResponseHandler.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/TopologyResponseHandler.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/TopologyResponseHandler.java
new file mode 100644
index 0000000..f7b99ec
--- /dev/null
+++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/TopologyResponseHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.spi.core.remoting;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public interface TopologyResponseHandler
+{
+   // This is sent when the server is telling the client the node is being disconnected
+   void nodeDisconnected(RemotingConnection conn, String nodeID, String scaleDownTargetNodeID);
+
+   void notifyNodeUp(long uniqueEventID,
+                     final String backupGroupName,
+                     final String scaleDownGroupName,
+                     final String nodeName,
+                     final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                     final boolean isLast);
+
+   // This is sent when any node on the cluster topology is going down
+   void notifyNodeDown(final long eventTime, final String nodeID);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/utils/FutureLatch.java
----------------------------------------------------------------------
diff --git a/hornetq-core-client/src/main/java/org/hornetq/utils/FutureLatch.java b/hornetq-core-client/src/main/java/org/hornetq/utils/FutureLatch.java
index 70facaf..51a32a0 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/utils/FutureLatch.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/utils/FutureLatch.java
@@ -22,7 +22,19 @@ import java.util.concurrent.TimeUnit;
  */
 public class FutureLatch implements Runnable
 {
-   private final CountDownLatch latch = new CountDownLatch(1);
+   private final CountDownLatch latch;
+
+   public FutureLatch()
+   {
+      super();
+      latch = new CountDownLatch(1);
+   }
+
+   public FutureLatch(int latches)
+   {
+      super();
+      latch =  new CountDownLatch(latches);
+   }
 
    public boolean await(final long timeout)
    {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/pom.xml
----------------------------------------------------------------------
diff --git a/hornetq-dto/pom.xml b/hornetq-dto/pom.xml
new file mode 100644
index 0000000..ed253c8
--- /dev/null
+++ b/hornetq-dto/pom.xml
@@ -0,0 +1,147 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+   <modelVersion>4.0.0</modelVersion>
+
+   <parent>
+      <groupId>org.hornetq</groupId>
+      <artifactId>hornetq-pom</artifactId>
+      <version>2.5.0-SNAPSHOT</version>
+   </parent>
+
+   <artifactId>hornetq-dto</artifactId>
+   <packaging>jar</packaging>
+   <name>HornetQ DTO</name>
+
+   <properties>
+     <hornetq.basedir>${project.basedir}/..</hornetq.basedir>
+   </properties>
+
+   <dependencies>
+      <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-databind</artifactId>
+        <version>${jackson-databind.version}</version>
+      </dependency>
+   </dependencies>
+
+   <build>
+     <resources>
+       <resource>
+         <directory>target/schema</directory>
+         <includes>
+           <include>**/*</include>
+         </includes>
+       </resource>
+       <resource>
+         <directory>src/main/resources</directory>
+         <includes>
+           <include>**/*</include>
+         </includes>
+         <filtering>true</filtering>
+       </resource>
+     </resources>
+
+     <plugins>
+       <plugin>
+         <artifactId>maven-antrun-plugin</artifactId>
+         <version>1.7</version>
+         <executions>
+           <execution>
+             <phase>generate-resources</phase>
+             <configuration>
+               <tasks>
+                 <taskdef name="schemagen" classname="com.sun.tools.jxc.SchemaGenTask"/>
+                 <mkdir dir="${project.build.directory}/schema/org/hornetq/dto"/>
+                 <echo message="Generating XSD to: ${project.build.directory}/schema/org/hornetq/dto"/>
+                 <schemagen srcdir="${basedir}/.." destdir="${project.build.directory}/schema/org/hornetq/dto"
+                            includeantruntime="false">
+                   <schema namespace="http://hornetq.org/schema" file="hornetq.xsd"/>
+                   <classpath refid="maven.compile.classpath"/>
+                   <include name="**/package-info.java"/>
+                   <include name="**/*DTO.java"/>
+                   <exclude name="**/.git/**"/>
+                   <exclude name="**/.svn/**"/>
+                 </schemagen>
+                 <copy todir="${project.build.directory}/classes">
+                   <fileset dir="${project.build.directory}/schema"/>
+                 </copy>
+               </tasks>
+             </configuration>
+             <goals>
+               <goal>run</goal>
+             </goals>
+           </execution>
+         </executions>
+         <dependencies>
+            <dependency>
+               <groupId>javax.xml.bind</groupId>
+               <artifactId>jaxb-api</artifactId>
+               <version>2.2.7</version>
+            </dependency>
+            <dependency>
+               <groupId>com.sun.xml.bind</groupId>
+               <artifactId>jaxb-impl</artifactId>
+               <version>2.2.7</version>
+            </dependency>
+            <dependency>
+               <groupId>com.sun.xml.bind</groupId>
+               <artifactId>jaxb-jxc</artifactId>
+               <version>2.2.7</version>
+            </dependency>
+         </dependencies>
+       </plugin>
+     </plugins>
+   </build>
+
+   <profiles>
+     <profile>
+       <id>jdk-1.5</id>
+       <activation>
+         <jdk>1.5</jdk>
+       </activation>
+       <dependencies>
+         <dependency>
+           <groupId>javax.xml.bind</groupId>
+           <artifactId>jaxb-api</artifactId>
+           <version>${jaxb-api-version}</version>
+         </dependency>
+         <dependency>
+           <groupId>com.sun.xml.bind</groupId>
+           <artifactId>jaxb-impl</artifactId>
+           <version>${jaxb-version}</version>
+         </dependency>
+       </dependencies>
+     </profile>
+
+    <profile>
+       <id>ibmjdk</id>
+       <activation>
+         <file>
+           <exists>${java.home}/../lib/tools.jar</exists>
+         </file>
+       </activation>
+       <build>
+         <pluginManagement>
+           <plugins>
+             <plugin>
+               <groupId>org.apache.maven.plugins</groupId>
+               <artifactId>maven-antrun-plugin</artifactId>
+               <dependencies>
+                 <dependency>
+                   <groupId>com.sun</groupId>
+                   <artifactId>tools</artifactId>
+                   <!--the real JDK version could be 1.5 or 1.6-->
+                   <version>1.5.0</version>
+                   <scope>system</scope>
+                   <optional>true</optional>
+                   <systemPath>${java.home}/../lib/tools.jar</systemPath>
+                 </dependency>
+               </dependencies>
+             </plugin>
+           </plugins>
+         </pluginManagement>
+       </build>
+     </profile>
+   </profiles>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/src/main/java/org/hornetq/dto/BasicSecurityDTO.java
----------------------------------------------------------------------
diff --git a/hornetq-dto/src/main/java/org/hornetq/dto/BasicSecurityDTO.java b/hornetq-dto/src/main/java/org/hornetq/dto/BasicSecurityDTO.java
new file mode 100644
index 0000000..d9836b5
--- /dev/null
+++ b/hornetq-dto/src/main/java/org/hornetq/dto/BasicSecurityDTO.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.dto;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "basic-security")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class BasicSecurityDTO extends SecurityDTO
+{
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/src/main/java/org/hornetq/dto/BrokerDTO.java
----------------------------------------------------------------------
diff --git a/hornetq-dto/src/main/java/org/hornetq/dto/BrokerDTO.java b/hornetq-dto/src/main/java/org/hornetq/dto/BrokerDTO.java
new file mode 100644
index 0000000..9c31e80
--- /dev/null
+++ b/hornetq-dto/src/main/java/org/hornetq/dto/BrokerDTO.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.dto;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElementRef;
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "broker")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonTypeInfo(use = JsonTypeInfo.Id.CUSTOM, include = JsonTypeInfo.As.PROPERTY, property = "@class")
+public class BrokerDTO
+{
+
+   @XmlElementRef
+   public CoreDTO core;
+
+   @XmlElementRef(required = false)
+   public JmsDTO jms;
+
+   @XmlElementRef
+   public SecurityDTO security;
+
+   @XmlElementRef
+   public NamingDTO naming;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/src/main/java/org/hornetq/dto/CoreDTO.java
----------------------------------------------------------------------
diff --git a/hornetq-dto/src/main/java/org/hornetq/dto/CoreDTO.java b/hornetq-dto/src/main/java/org/hornetq/dto/CoreDTO.java
new file mode 100644
index 0000000..7cd62a8
--- /dev/null
+++ b/hornetq-dto/src/main/java/org/hornetq/dto/CoreDTO.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.dto;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "core")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class CoreDTO
+{
+
+   @XmlAttribute
+   public String configuration;
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/src/main/java/org/hornetq/dto/JmsDTO.java
----------------------------------------------------------------------
diff --git a/hornetq-dto/src/main/java/org/hornetq/dto/JmsDTO.java b/hornetq-dto/src/main/java/org/hornetq/dto/JmsDTO.java
new file mode 100644
index 0000000..6990298
--- /dev/null
+++ b/hornetq-dto/src/main/java/org/hornetq/dto/JmsDTO.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.dto;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "jms")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class JmsDTO
+{
+
+   @XmlAttribute
+   public String configuration;
+
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/src/main/java/org/hornetq/dto/NamingDTO.java
----------------------------------------------------------------------
diff --git a/hornetq-dto/src/main/java/org/hornetq/dto/NamingDTO.java b/hornetq-dto/src/main/java/org/hornetq/dto/NamingDTO.java
new file mode 100644
index 0000000..8e614ae
--- /dev/null
+++ b/hornetq-dto/src/main/java/org/hornetq/dto/NamingDTO.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.dto;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "naming")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class NamingDTO
+{
+   @XmlAttribute
+   public String bindAddress = "localhost";
+
+   @XmlAttribute
+   public int port = 1099;
+
+   @XmlAttribute
+   public String rmiBindAddress = "localhost";
+
+   @XmlAttribute
+   public int rmiPort = 1098;
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/src/main/java/org/hornetq/dto/SecurityDTO.java
----------------------------------------------------------------------
diff --git a/hornetq-dto/src/main/java/org/hornetq/dto/SecurityDTO.java b/hornetq-dto/src/main/java/org/hornetq/dto/SecurityDTO.java
new file mode 100644
index 0000000..bf52ae3
--- /dev/null
+++ b/hornetq-dto/src/main/java/org/hornetq/dto/SecurityDTO.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.dto;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlType;
+
+@XmlType(name = "security")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonTypeInfo(use = JsonTypeInfo.Id.CUSTOM, include = JsonTypeInfo.As.PROPERTY, property = "@class")
+public class SecurityDTO
+{
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/src/main/java/org/hornetq/dto/XmlUtil.java
----------------------------------------------------------------------
diff --git a/hornetq-dto/src/main/java/org/hornetq/dto/XmlUtil.java b/hornetq-dto/src/main/java/org/hornetq/dto/XmlUtil.java
new file mode 100644
index 0000000..58037ff
--- /dev/null
+++ b/hornetq-dto/src/main/java/org/hornetq/dto/XmlUtil.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.dto;
+
+import javax.xml.XMLConstants;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.util.StreamReaderDelegate;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class XmlUtil
+{
+
+   /**
+    * Changes ${property} with values from a properties object
+    */
+   static class PropertiesFilter extends StreamReaderDelegate
+   {
+
+      static final Pattern pattern = Pattern.compile("\\$\\{([^\\}]+)\\}");
+      private final Properties props;
+
+      public PropertiesFilter(XMLStreamReader parent, Properties props)
+      {
+         super(parent);
+         this.props = props;
+      }
+
+      public String getAttributeValue(int index)
+      {
+         return filter(super.getAttributeValue(index));
+      }
+
+      public String filter(String str)
+      {
+         int start = 0;
+         while (true)
+         {
+            Matcher matcher = pattern.matcher(str);
+            if (!matcher.find(start))
+            {
+               break;
+            }
+            String group = matcher.group(1);
+            String property = props.getProperty(group);
+            if (property != null)
+            {
+               str = matcher.replaceFirst(Matcher.quoteReplacement(property));
+            }
+            else
+            {
+               start = matcher.end();
+            }
+         }
+         return str;
+      }
+
+   }
+
+   private static final XMLInputFactory factory = XMLInputFactory.newInstance();
+
+   public static <T> T decode(Class<T> clazz, File configuration) throws Exception
+   {
+      JAXBContext jaxbContext = JAXBContext.newInstance("org.hornetq.dto");
+
+      Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();
+      SchemaFactory sf = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+      sf.setFeature("http://apache.org/xml/features/validation/schema-full-checking", false);
+      InputStream xsdStream = XmlUtil.class.getClassLoader().getResourceAsStream("org/hornetq/dto/hornetq.xsd");
+      StreamSource xsdSource = new StreamSource(xsdStream);
+      Schema schema = sf.newSchema(xsdSource);
+      unmarshaller.setSchema(schema);
+
+      XMLStreamReader reader = factory.createXMLStreamReader(new FileInputStream(configuration));
+      //TODO - support properties files
+      Properties props = System.getProperties();
+
+      if (props != null)
+      {
+         reader = new PropertiesFilter(reader, props);
+      }
+
+      return clazz.cast(unmarshaller.unmarshal(reader));
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/src/main/java/org/hornetq/dto/package-info.java
----------------------------------------------------------------------
diff --git a/hornetq-dto/src/main/java/org/hornetq/dto/package-info.java b/hornetq-dto/src/main/java/org/hornetq/dto/package-info.java
new file mode 100644
index 0000000..4da6f69
--- /dev/null
+++ b/hornetq-dto/src/main/java/org/hornetq/dto/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+/**
+ * The JAXB POJOs for the XML configuration of HornetQ broker
+ */
+@javax.xml.bind.annotation.XmlSchema(
+        namespace = "http://hornetq.org/schema",
+        elementFormDefault = javax.xml.bind.annotation.XmlNsForm.QUALIFIED)
+package org.hornetq.dto;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/src/main/resources/org/hornetq/dto/jaxb.index
----------------------------------------------------------------------
diff --git a/hornetq-dto/src/main/resources/org/hornetq/dto/jaxb.index b/hornetq-dto/src/main/resources/org/hornetq/dto/jaxb.index
new file mode 100644
index 0000000..3cec94b
--- /dev/null
+++ b/hornetq-dto/src/main/resources/org/hornetq/dto/jaxb.index
@@ -0,0 +1,19 @@
+## ---------------------------------------------------------------------------
+## Copyright 2005-2014 Red Hat, Inc.
+## Red Hat licenses this file to you under the Apache License, version
+## 2.0 (the "License"); you may not use this file except in compliance
+## with the License.  You may obtain a copy of the License at
+##    http://www.apache.org/licenses/LICENSE-2.0
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+## implied.  See the License for the specific language governing
+## permissions and limitations under the License.
+## ---------------------------------------------------------------------------
+BrokerDTO
+CoreDTO
+JmsDTO
+SecurityDTO
+BasicSecurityDTO
+NamingDTO
+

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSQueueControl.java
----------------------------------------------------------------------
diff --git a/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSQueueControl.java b/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSQueueControl.java
index b672ab7..ae7cba1 100644
--- a/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSQueueControl.java
+++ b/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSQueueControl.java
@@ -277,4 +277,12 @@ public interface JMSQueueControl extends DestinationControl
    @Operation(desc = "List all the existent consumers on the Queue")
    String listConsumersAsJSON() throws Exception;
 
+   /**
+    * it will flush one cycle on internal executors, so you would be sure that any pending tasks are done before you call
+    * any other measure.
+    * It is useful if you need the exact number of counts on a message
+    * @throws Exception
+    */
+   void flushExecutor();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSServerControl.java
----------------------------------------------------------------------
diff --git a/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSServerControl.java b/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSServerControl.java
index d0d7f6c..f58692e 100644
--- a/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSServerControl.java
+++ b/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSServerControl.java
@@ -268,6 +268,18 @@ public interface JMSServerControl
    boolean closeConnectionsForAddress(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress) throws Exception;
 
    /**
+    * Closes all the connections on this server for consumers which are consuming from a queue associated with a particular address.
+    */
+   @Operation(desc = "Closes all the consumer connections for the given HornetQ address", impact = MBeanOperationInfo.INFO)
+   boolean closeConsumerConnectionsForAddress(@Parameter(desc = "a HornetQ address", name = "address") String address) throws Exception;
+
+   /**
+    * Closes all the connections on this server for sessions using a particular user name.
+    */
+   @Operation(desc = "Closes all the connections for session using a particular user name", impact = MBeanOperationInfo.INFO)
+   boolean closeConnectionsForUser(@Parameter(desc = "a user name", name = "userName") String address) throws Exception;
+
+   /**
     * Lists all the IDs of the connections connected to this server.
     */
    @Operation(desc = "List all the connection IDs", impact = MBeanOperationInfo.INFO)

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQBytesMessage.java
----------------------------------------------------------------------
diff --git a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQBytesMessage.java b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQBytesMessage.java
index 55a55dc..2cf4154 100644
--- a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQBytesMessage.java
+++ b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQBytesMessage.java
@@ -24,6 +24,31 @@ import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.core.message.impl.MessageImpl;
 
+import static org.hornetq.reader.BytesMessageUtil.bytesMessageReset;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadBoolean;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadByte;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadBytes;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadChar;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadDouble;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadFloat;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadInt;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadLong;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadShort;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadUTF;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadUnsignedByte;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadUnsignedShort;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteBoolean;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteByte;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteBytes;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteChar;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteDouble;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteFloat;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteInt;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteLong;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteObject;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteShort;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteUTF;
+
 /**
  * HornetQ implementation of a JMS {@link BytesMessage}.
  *
@@ -85,7 +110,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage
       checkRead();
       try
       {
-         return getBuffer().readBoolean();
+         return bytesReadBoolean(message);
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -98,7 +123,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage
       checkRead();
       try
       {
-         return getBuffer().readByte();
+         return bytesReadByte(message);
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -111,7 +136,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage
       checkRead();
       try
       {
-         return getBuffer().readUnsignedByte();
+         return bytesReadUnsignedByte(message);
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -124,7 +149,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage
       checkRead();
       try
       {
-         return getBuffer().readShort();
+         return bytesReadShort(message);
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -137,7 +162,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage
       checkRead();
       try
       {
-         return getBuffer().readUnsignedShort();
+         return bytesReadUnsignedShort(message);
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -150,7 +175,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage
       checkRead();
       try
       {
-         return (char)getBuffer().readShort();
+         return bytesReadChar(message);
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -163,7 +188,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage
       checkRead();
       try
       {
-         return getBuffer().readInt();
+         return bytesReadInt(message);
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -176,7 +201,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage
       checkRead();
       try
       {
-         return getBuffer().readLong();
+         return bytesReadLong(message);
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -189,7 +214,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage
       checkRead();
       try
       {
-         return Float.intBitsToFloat(getBuffer().readInt());
+         return bytesReadFloat(message);
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -202,7 +227,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage
       checkRead();
       try
       {
-         return Double.longBitsToDouble(getBuffer().readLong());
+         return bytesReadDouble(message);
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -215,7 +240,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage
       checkRead();
       try
       {
-         return getBuffer().readUTF();
+         return bytesReadUTF(message);
       }
       catch (IndexOutOfBoundsException e)
       {
@@ -232,74 +257,63 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage
 
    public int readBytes(final byte[] value) throws JMSException
    {
-      return readBytes(value, value.length);
+      checkRead();
+      return bytesReadBytes(message, value);
    }
 
    public int readBytes(final byte[] value, final int length) throws JMSException
    {
       checkRead();
+      return bytesReadBytes(message, value, length);
 
-      if (!getBuffer().readable())
-      {
-         return -1;
-      }
-
-      int read = Math.min(length, getBuffer().readableBytes());
-
-      if (read != 0)
-      {
-         getBuffer().readBytes(value, 0, read);
-      }
-
-      return read;
    }
 
    public void writeBoolean(final boolean value) throws JMSException
    {
       checkWrite();
-      getBuffer().writeBoolean(value);
+      bytesWriteBoolean(message, value);
    }
 
    public void writeByte(final byte value) throws JMSException
    {
       checkWrite();
-      getBuffer().writeByte(value);
+      bytesWriteByte(message, value);
    }
 
    public void writeShort(final short value) throws JMSException
    {
       checkWrite();
-      getBuffer().writeShort(value);
+      bytesWriteShort(message, value);
    }
 
    public void writeChar(final char value) throws JMSException
    {
       checkWrite();
-      getBuffer().writeShort((short)value);
+      bytesWriteChar(message, value);
    }
 
    public void writeInt(final int value) throws JMSException
    {
       checkWrite();
-      getBuffer().writeInt(value);
+      bytesWriteInt(message, value);
    }
 
    public void writeLong(final long value) throws JMSException
    {
       checkWrite();
-      getBuffer().writeLong(value);
+      bytesWriteLong(message, value);
    }
 
    public void writeFloat(final float value) throws JMSException
    {
       checkWrite();
-      getBuffer().writeInt(Float.floatToIntBits(value));
+      bytesWriteFloat(message, value);
    }
 
    public void writeDouble(final double value) throws JMSException
    {
       checkWrite();
-      getBuffer().writeLong(Double.doubleToLongBits(value));
+      bytesWriteDouble(message, value);
    }
 
    public void writeUTF(final String value) throws JMSException
@@ -307,7 +321,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage
       checkWrite();
       try
       {
-         getBuffer().writeUTF(value);
+         bytesWriteUTF(message, value);
       }
       catch (Exception e)
       {
@@ -316,67 +330,25 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage
          je.initCause(e);
          throw je;
       }
+
    }
 
    public void writeBytes(final byte[] value) throws JMSException
    {
       checkWrite();
-      getBuffer().writeBytes(value);
+      bytesWriteBytes(message, value);
    }
 
    public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException
    {
       checkWrite();
-      getBuffer().writeBytes(value, offset, length);
+      bytesWriteBytes(message, value, offset, length);
    }
 
    public void writeObject(final Object value) throws JMSException
    {
-      if (value == null)
-      {
-         throw new NullPointerException("Attempt to write a null value");
-      }
-      if (value instanceof String)
-      {
-         writeUTF((String)value);
-      }
-      else if (value instanceof Boolean)
-      {
-         writeBoolean((Boolean)value);
-      }
-      else if (value instanceof Character)
-      {
-         writeChar((Character)value);
-      }
-      else if (value instanceof Byte)
-      {
-         writeByte((Byte)value);
-      }
-      else if (value instanceof Short)
-      {
-         writeShort((Short)value);
-      }
-      else if (value instanceof Integer)
-      {
-         writeInt((Integer)value);
-      }
-      else if (value instanceof Long)
-      {
-         writeLong((Long)value);
-      }
-      else if (value instanceof Float)
-      {
-         writeFloat((Float)value);
-      }
-      else if (value instanceof Double)
-      {
-         writeDouble((Double)value);
-      }
-      else if (value instanceof byte[])
-      {
-         writeBytes((byte[])value);
-      }
-      else
+      checkWrite();
+      if (!bytesWriteObject(message, value))
       {
          throw new MessageFormatException("Invalid object for properties");
       }
@@ -389,13 +361,9 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage
          readOnly = true;
 
          bodyLength = message.getBodySize();
-
-         getBuffer().resetReaderIndex();
-      }
-      else
-      {
-         getBuffer().resetReaderIndex();
       }
+
+      bytesMessageReset(message);
    }
 
    @Override
@@ -407,11 +375,20 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage
    // HornetQRAMessage overrides ----------------------------------------
 
    @Override
-   public void clearBody()
+   public void clearBody() throws JMSException
    {
       super.clearBody();
 
-      getBuffer().clear();
+      try
+      {
+         getBuffer().clear();
+      }
+      catch (RuntimeException e)
+      {
+         JMSException e2 = new JMSException(e.getMessage());
+         e2.initCause(e);
+         throw e2;
+      }
    }
 
    public long getBodyLength() throws JMSException

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQConnection.java
----------------------------------------------------------------------
diff --git a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQConnection.java b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQConnection.java
index 5c13b1f..5d4413b 100644
--- a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQConnection.java
+++ b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQConnection.java
@@ -43,6 +43,7 @@ import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.api.jms.HornetQJMSConstants;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.version.Version;
+import org.hornetq.reader.MessageUtil;
 import org.hornetq.utils.ConcurrentHashSet;
 import org.hornetq.utils.UUIDGenerator;
 import org.hornetq.utils.VersionLoader;
@@ -70,7 +71,7 @@ public class HornetQConnection extends HornetQConnectionForContextImpl implement
 
    public static final String EXCEPTION_DISCONNECT = "DISCONNECT";
 
-   public static final SimpleString CONNECTION_ID_PROPERTY_NAME = new SimpleString("__HQ_CID");
+   public static final SimpleString CONNECTION_ID_PROPERTY_NAME = MessageUtil.CONNECTION_ID_PROPERTY_NAME;
 
    // Static ---------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMapMessage.java
----------------------------------------------------------------------
diff --git a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMapMessage.java b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMapMessage.java
index 564da7d..d8d9cf7 100644
--- a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMapMessage.java
+++ b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMapMessage.java
@@ -28,6 +28,10 @@ import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.utils.TypedProperties;
 
+
+import static org.hornetq.reader.MapMessageUtil.writeBodyMap;
+import static org.hornetq.reader.MapMessageUtil.readBodyMap;
+
 /**
  * HornetQ implementation of a JMS MapMessage.
  *
@@ -46,7 +50,7 @@ public final class HornetQMapMessage extends HornetQMessage implements MapMessag
 
    // Attributes ----------------------------------------------------
 
-   private TypedProperties map = new TypedProperties();
+   private final TypedProperties map = new TypedProperties();
 
    private boolean invalid;
 
@@ -61,8 +65,6 @@ public final class HornetQMapMessage extends HornetQMessage implements MapMessag
    {
       super(HornetQMapMessage.TYPE, session);
 
-      map = new TypedProperties();
-
       invalid = true;
    }
 
@@ -368,7 +370,7 @@ public final class HornetQMapMessage extends HornetQMessage implements MapMessag
    // HornetQRAMessage overrides ----------------------------------------
 
    @Override
-   public void clearBody()
+   public void clearBody() throws JMSException
    {
       super.clearBody();
 
@@ -382,10 +384,7 @@ public final class HornetQMapMessage extends HornetQMessage implements MapMessag
    {
       if (invalid)
       {
-         message.getBodyBuffer().resetWriterIndex();
-
-         map.encode(message.getBodyBuffer());
-
+         writeBodyMap(message, map);
          invalid = false;
       }
 
@@ -397,7 +396,7 @@ public final class HornetQMapMessage extends HornetQMessage implements MapMessag
    {
       super.doBeforeReceive();
 
-      map.decode(message.getBodyBuffer());
+      readBodyMap(message, map);
    }
 
    // Package protected ---------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMessage.java
----------------------------------------------------------------------
diff --git a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMessage.java b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMessage.java
index ac66539..16fb52e 100644
--- a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMessage.java
+++ b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMessage.java
@@ -15,12 +15,10 @@ package org.hornetq.jms.client;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.OutputStream;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 
 import javax.jms.DeliveryMode;
@@ -40,10 +38,11 @@ import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.jms.HornetQJMSConstants;
-import org.hornetq.core.client.impl.ClientMessageImpl;
 import org.hornetq.core.message.impl.MessageInternal;
+import org.hornetq.reader.MessageUtil;
 import org.hornetq.utils.UUID;
 
+
 /**
  * HornetQ implementation of a JMS Message.
  * <br>
@@ -64,23 +63,6 @@ import org.hornetq.utils.UUID;
 public class HornetQMessage implements javax.jms.Message
 {
    // Constants -----------------------------------------------------
-
-   private static final SimpleString REPLYTO_HEADER_NAME = ClientMessageImpl.REPLYTO_HEADER_NAME;
-
-   private static final SimpleString CORRELATIONID_HEADER_NAME = new SimpleString("JMSCorrelationID");
-
-   private static final SimpleString TYPE_HEADER_NAME = new SimpleString("JMSType");
-
-   private static final SimpleString JMS = new SimpleString("JMS");
-
-   private static final SimpleString JMSX = new SimpleString("JMSX");
-
-   private static final SimpleString JMS_ = new SimpleString("JMS_");
-
-   private static final String JMSXDELIVERYCOUNT = "JMSXDeliveryCount";
-
-   private static final String JMSXGROUPID = "JMSXGroupID";
-
    public static final byte TYPE = org.hornetq.api.core.Message.DEFAULT_TYPE;
 
    public static Map<String, Object> coreMaptoJMSMap(final Map<String, Object> coreMessage)
@@ -363,55 +345,34 @@ public class HornetQMessage implements javax.jms.Message
 
    public byte[] getJMSCorrelationIDAsBytes() throws JMSException
    {
-      Object obj = message.getObjectProperty(HornetQMessage.CORRELATIONID_HEADER_NAME);
-
-      if (obj instanceof byte[])
-      {
-         return (byte[])obj;
-      }
-      else
-      {
-         return null;
-      }
+      return MessageUtil.getJMSCorrelationIDAsBytes(message);
    }
 
    public void setJMSCorrelationIDAsBytes(final byte[] correlationID) throws JMSException
    {
-      if (correlationID == null || correlationID.length == 0)
+      try
+      {
+         MessageUtil.setJMSCorrelationIDAsBytes(message, correlationID);
+      }
+      catch (HornetQException e)
       {
-         throw new JMSException("Please specify a non-zero length byte[]");
+         JMSException ex = new JMSException(e.getMessage());
+         ex.initCause(e);
+         throw ex;
       }
-      message.putBytesProperty(HornetQMessage.CORRELATIONID_HEADER_NAME, correlationID);
    }
 
    public void setJMSCorrelationID(final String correlationID) throws JMSException
    {
-      if (correlationID == null)
-      {
-         message.removeProperty(HornetQMessage.CORRELATIONID_HEADER_NAME);
-
-         jmsCorrelationID = null;
-      }
-      else
-      {
-         message.putStringProperty(HornetQMessage.CORRELATIONID_HEADER_NAME, new SimpleString(correlationID));
-
-         jmsCorrelationID = correlationID;
-      }
+      MessageUtil.setJMSCorrelationID(message, correlationID);
+      jmsCorrelationID = correlationID;
    }
 
    public String getJMSCorrelationID() throws JMSException
    {
       if (jmsCorrelationID == null)
       {
-         try
-         {
-            jmsCorrelationID = message.getStringProperty(HornetQMessage.CORRELATIONID_HEADER_NAME);
-         }
-         catch (HornetQPropertyConversionException e)
-         {
-            jmsCorrelationID = null;
-         }
+         jmsCorrelationID = MessageUtil.getJMSCorrelationID(message);
       }
 
       return jmsCorrelationID;
@@ -421,7 +382,8 @@ public class HornetQMessage implements javax.jms.Message
    {
       if (replyTo == null)
       {
-         SimpleString repl = message.getSimpleStringProperty(HornetQMessage.REPLYTO_HEADER_NAME);
+
+         SimpleString repl = MessageUtil.getJMSReplyTo(message);
 
          if (repl != null)
          {
@@ -433,10 +395,10 @@ public class HornetQMessage implements javax.jms.Message
 
    public void setJMSReplyTo(final Destination dest) throws JMSException
    {
+
       if (dest == null)
       {
-         message.removeProperty(HornetQMessage.REPLYTO_HEADER_NAME);
-
+         MessageUtil.setJMSReplyTo(message, null);
          replyTo = null;
       }
       else
@@ -448,7 +410,7 @@ public class HornetQMessage implements javax.jms.Message
 
          HornetQDestination jbd = (HornetQDestination)dest;
 
-         message.putStringProperty(HornetQMessage.REPLYTO_HEADER_NAME, jbd.getSimpleAddress());
+         MessageUtil.setJMSReplyTo(message, jbd.getSimpleAddress());
 
          replyTo = jbd;
       }
@@ -520,7 +482,7 @@ public class HornetQMessage implements javax.jms.Message
    {
       if (type != null)
       {
-         message.putStringProperty(HornetQMessage.TYPE_HEADER_NAME, new SimpleString(type));
+         MessageUtil.setJMSType(message, type);
 
          jmsType = type;
       }
@@ -530,12 +492,7 @@ public class HornetQMessage implements javax.jms.Message
    {
       if (jmsType == null)
       {
-         SimpleString ss = message.getSimpleStringProperty(HornetQMessage.TYPE_HEADER_NAME);
-
-         if (ss != null)
-         {
-            jmsType = ss.toString();
-         }
+         jmsType = MessageUtil.getJMSType(message);
       }
       return jmsType;
    }
@@ -564,35 +521,20 @@ public class HornetQMessage implements javax.jms.Message
 
    public void clearProperties() throws JMSException
    {
-      List<SimpleString> toRemove = new ArrayList<SimpleString>();
-
-      for (SimpleString propName : message.getPropertyNames())
-      {
-         if (!propName.startsWith(HornetQMessage.JMS) || propName.startsWith(HornetQMessage.JMSX) ||
-             propName.startsWith(HornetQMessage.JMS_))
-         {
-            toRemove.add(propName);
-         }
-      }
 
-      for (SimpleString propName : toRemove)
-      {
-         message.removeProperty(propName);
-      }
+      MessageUtil.clearProperties(message);
 
       propertiesReadOnly = false;
    }
 
-   public void clearBody()
+   public void clearBody() throws JMSException
    {
       readOnly = false;
    }
 
    public boolean propertyExists(final String name) throws JMSException
    {
-      return message.containsProperty(new SimpleString(name)) || name.equals(HornetQMessage.JMSXDELIVERYCOUNT) ||
-             HornetQMessage.JMSXGROUPID.equals(name) &&
-             message.containsProperty(org.hornetq.api.core.Message.HDR_GROUP_ID);
+      return MessageUtil.propertyExists(message, name);
    }
 
    public boolean getBooleanProperty(final String name) throws JMSException
@@ -633,7 +575,7 @@ public class HornetQMessage implements javax.jms.Message
 
    public int getIntProperty(final String name) throws JMSException
    {
-      if (HornetQMessage.JMSXDELIVERYCOUNT.equals(name))
+      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name))
       {
          return message.getDeliveryCount();
       }
@@ -650,7 +592,7 @@ public class HornetQMessage implements javax.jms.Message
 
    public long getLongProperty(final String name) throws JMSException
    {
-      if (HornetQMessage.JMSXDELIVERYCOUNT.equals(name))
+      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name))
       {
          return message.getDeliveryCount();
       }
@@ -691,14 +633,14 @@ public class HornetQMessage implements javax.jms.Message
 
    public String getStringProperty(final String name) throws JMSException
    {
-      if (HornetQMessage.JMSXDELIVERYCOUNT.equals(name))
+      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name))
       {
          return String.valueOf(message.getDeliveryCount());
       }
 
       try
       {
-         if (HornetQMessage.JMSXGROUPID.equals(name))
+         if (MessageUtil.JMSXGROUPID.equals(name))
          {
             return message.getStringProperty(org.hornetq.api.core.Message.HDR_GROUP_ID);
          }
@@ -715,7 +657,7 @@ public class HornetQMessage implements javax.jms.Message
 
    public Object getObjectProperty(final String name) throws JMSException
    {
-      if (HornetQMessage.JMSXDELIVERYCOUNT.equals(name))
+      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name))
       {
          return String.valueOf(message.getDeliveryCount());
       }
@@ -732,20 +674,7 @@ public class HornetQMessage implements javax.jms.Message
    @Override
    public Enumeration getPropertyNames() throws JMSException
    {
-      HashSet<String> set = new HashSet<String>();
-
-      for (SimpleString propName : message.getPropertyNames())
-      {
-         if ((!propName.startsWith(HornetQMessage.JMS) || propName.startsWith(HornetQMessage.JMSX) ||
-             propName.startsWith(HornetQMessage.JMS_)) && !propName.startsWith(HornetQConnection.CONNECTION_ID_PROPERTY_NAME))
-         {
-            set.add(propName.toString());
-         }
-      }
-
-      set.add(HornetQMessage.JMSXDELIVERYCOUNT);
-
-      return Collections.enumeration(set);
+      return Collections.enumeration(MessageUtil.getPropertyNames(message));
    }
 
    public void setBooleanProperty(final String name, final boolean value) throws JMSException
@@ -795,7 +724,7 @@ public class HornetQMessage implements javax.jms.Message
    {
       checkProperty(name);
 
-      if (HornetQMessage.JMSXGROUPID.equals(name))
+      if (MessageUtil.JMSXGROUPID.equals(name))
       {
          message.putStringProperty(org.hornetq.api.core.Message.HDR_GROUP_ID, SimpleString.toSimpleString(value));
       }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQObjectMessage.java
----------------------------------------------------------------------
diff --git a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQObjectMessage.java b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQObjectMessage.java
index 23a49d0..ff1e452 100644
--- a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQObjectMessage.java
+++ b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQObjectMessage.java
@@ -164,7 +164,7 @@ public class HornetQObjectMessage extends HornetQMessage implements ObjectMessag
    }
 
    @Override
-   public void clearBody()
+   public void clearBody() throws JMSException
    {
       super.clearBody();
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQQueue.java
----------------------------------------------------------------------
diff --git a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQQueue.java b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQQueue.java
index b496dc0..46ec8f0 100644
--- a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQQueue.java
+++ b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQQueue.java
@@ -60,7 +60,7 @@ public class HornetQQueue extends HornetQDestination implements Queue
       super(address, name, temporary, true, session);
    }
 
-   protected HornetQQueue(final String address, final String name)
+   public HornetQQueue(final String address, final String name)
    {
       super(address, name, false, true, null);
    }


Mime
View raw message