activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [35/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:42:05 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/TopologyMemberImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/TopologyMemberImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/TopologyMemberImpl.java
new file mode 100644
index 0000000..a63f787
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/TopologyMemberImpl.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.client.impl;
+
+import org.apache.activemq6.api.core.Pair;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.api.core.client.TopologyMember;
+import org.apache.activemq6.spi.core.protocol.RemotingConnection;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *         Created Aug 16, 2010
+ */
+public final class TopologyMemberImpl implements TopologyMember
+{
+   private static final long serialVersionUID = 1123652191795626133L;
+
+   private final Pair<TransportConfiguration, TransportConfiguration> connector;
+
+   private final String backupGroupName;
+
+   private final String scaleDownGroupName;
+
+   /**
+    * transient to avoid serialization changes
+    */
+   private transient long uniqueEventID = System.currentTimeMillis();
+
+   private final String nodeId;
+
+   public TopologyMemberImpl(String nodeId, final String backupGroupName, final String scaleDownGroupName, final TransportConfiguration a,
+                             final TransportConfiguration b)
+   {
+      this.nodeId = nodeId;
+      this.backupGroupName = backupGroupName;
+      this.scaleDownGroupName = scaleDownGroupName;
+      this.connector = new Pair<TransportConfiguration, TransportConfiguration>(a, b);
+      uniqueEventID = System.currentTimeMillis();
+   }
+
+   @Override
+   public TransportConfiguration getLive()
+   {
+      return connector.getA();
+   }
+
+   @Override
+   public TransportConfiguration getBackup()
+   {
+      return connector.getB();
+   }
+
+   public void setBackup(final TransportConfiguration param)
+   {
+      connector.setB(param);
+   }
+
+   public void setLive(final TransportConfiguration param)
+   {
+      connector.setA(param);
+   }
+
+   @Override
+   public String getNodeId()
+   {
+      return nodeId;
+   }
+
+   @Override
+   public long getUniqueEventID()
+   {
+      return uniqueEventID;
+   }
+
+   @Override
+   public String getBackupGroupName()
+   {
+      return backupGroupName;
+   }
+
+   @Override
+   public String getScaleDownGroupName()
+   {
+      return scaleDownGroupName;
+   }
+
+   /**
+    * @param uniqueEventID the uniqueEventID to set
+    */
+   public void setUniqueEventID(final long uniqueEventID)
+   {
+      this.uniqueEventID = uniqueEventID;
+   }
+
+   public Pair<TransportConfiguration, TransportConfiguration> getConnector()
+   {
+      return connector;
+   }
+
+
+   public boolean isMember(RemotingConnection connection)
+   {
+      TransportConfiguration connectorConfig = connection.getTransportConnection() != null ? connection.getTransportConnection().getConnectorConfig() : null;
+
+      return isMember(connectorConfig);
+
+   }
+
+   public boolean isMember(TransportConfiguration configuration)
+   {
+      if (getConnector().getA() != null && getConnector().getA().equals(configuration) ||
+         getConnector().getB() != null && getConnector().getB().equals(configuration))
+      {
+         return true;
+      }
+      else
+      {
+         return false;
+      }
+   }
+
+
+   @Override
+   public String toString()
+   {
+      return "TopologyMember[id = " + nodeId + ", connector=" + connector + ", backupGroupName=" + backupGroupName + ", scaleDownGroupName=" + scaleDownGroupName + "]";
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryEntry.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryEntry.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryEntry.java
new file mode 100644
index 0000000..e5bfae4
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryEntry.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.cluster;
+
+import org.apache.activemq6.api.core.TransportConfiguration;
+
+/**
+ * A DiscoveryEntry
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class DiscoveryEntry
+{
+   private final String nodeID;
+   private final TransportConfiguration connector;
+   private final long lastUpdate;
+
+
+   public DiscoveryEntry(final String nodeID, final TransportConfiguration connector, final long lastUpdate)
+   {
+      this.nodeID = nodeID;
+      this.connector = connector;
+      this.lastUpdate = lastUpdate;
+   }
+
+   public String getNodeID()
+   {
+      return nodeID;
+   }
+
+   public TransportConfiguration getConnector()
+   {
+      return connector;
+   }
+
+   public long getLastUpdate()
+   {
+      return lastUpdate;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "DiscoveryEntry[nodeID=" + nodeID + ", connector=" + connector + ", lastUpdate=" + lastUpdate + "]";
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryGroup.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryGroup.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryGroup.java
new file mode 100644
index 0000000..fd2ae35
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryGroup.java
@@ -0,0 +1,432 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.cluster;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq6.api.core.BroadcastEndpoint;
+import org.apache.activemq6.api.core.BroadcastEndpointFactory;
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQBuffers;
+import org.apache.activemq6.api.core.HornetQInterruptedException;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.api.core.management.CoreNotificationType;
+import org.apache.activemq6.core.client.HornetQClientLogger;
+import org.apache.activemq6.core.server.HornetQComponent;
+import org.apache.activemq6.core.server.management.Notification;
+import org.apache.activemq6.core.server.management.NotificationService;
+import org.apache.activemq6.utils.TypedProperties;
+
+/**
+ * This class is used to search for members on the cluster through the opaque interface {@link BroadcastEndpoint}.
+ * <p>
+ * There are two current implementations, and that's probably all we will ever need.
+ * <p>
+ * We will probably keep both interfaces for a while as UDP is a simple solution requiring no extra dependencies which
+ * is suitable for users looking for embedded solutions.
+ * <p>
+ * Created 17 Nov 2008 13:21:45
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author Clebert Suconic
+ */
+public final class DiscoveryGroup implements HornetQComponent
+{
+   private static final boolean isTrace = HornetQClientLogger.LOGGER.isTraceEnabled();
+
+   private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>();
+
+   private final String name;
+
+   private Thread thread;
+
+   private boolean received;
+
+   private final Object waitLock = new Object();
+
+   private final Map<String, DiscoveryEntry> connectors = new ConcurrentHashMap<String, DiscoveryEntry>();
+
+   private final long timeout;
+
+   private volatile boolean started;
+
+   private final String nodeID;
+
+   private final Map<String, String> uniqueIDMap = new HashMap<String, String>();
+
+   private final BroadcastEndpoint endpoint;
+
+   private final NotificationService notificationService;
+
+   /**
+    * This is the main constructor, intended to be used
+    *
+    * @param nodeID
+    * @param name
+    * @param timeout
+    * @param endpointFactory
+    * @param service
+    * @throws Exception
+    */
+   public DiscoveryGroup(final String nodeID, final String name, final long timeout,
+                         BroadcastEndpointFactory endpointFactory,
+                         NotificationService service) throws Exception
+   {
+      this.nodeID = nodeID;
+      this.name = name;
+      this.timeout = timeout;
+      this.endpoint = endpointFactory.createBroadcastEndpoint();
+      this.notificationService = service;
+   }
+
+   public synchronized void start() throws Exception
+   {
+      if (started)
+      {
+         return;
+      }
+
+      endpoint.openClient();
+
+      started = true;
+
+      thread = new Thread(new DiscoveryRunnable(), "hornetq-discovery-group-thread-" + name);
+
+      thread.setDaemon(true);
+
+      thread.start();
+
+      if (notificationService != null)
+      {
+         TypedProperties props = new TypedProperties();
+
+         props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
+
+         Notification notification = new Notification(nodeID, CoreNotificationType.DISCOVERY_GROUP_STARTED, props);
+
+         notificationService.sendNotification(notification);
+      }
+   }
+
+   public void stop()
+   {
+      synchronized (this)
+      {
+         if (!started)
+         {
+            return;
+         }
+
+         started = false;
+      }
+
+      synchronized (waitLock)
+      {
+         waitLock.notifyAll();
+      }
+
+      try
+      {
+         endpoint.close(false);
+      }
+      catch (Exception e1)
+      {
+         HornetQClientLogger.LOGGER.errorStoppingDiscoveryBroadcastEndpoint(endpoint, e1);
+      }
+
+      try
+      {
+         thread.interrupt();
+         thread.join(10000);
+         if (thread.isAlive())
+         {
+            HornetQClientLogger.LOGGER.timedOutStoppingDiscovery();
+         }
+      }
+      catch (InterruptedException e)
+      {
+         throw new HornetQInterruptedException(e);
+      }
+
+      thread = null;
+
+      if (notificationService != null)
+      {
+         TypedProperties props = new TypedProperties();
+         props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
+         Notification notification = new Notification(nodeID, CoreNotificationType.DISCOVERY_GROUP_STOPPED, props);
+         try
+         {
+            notificationService.sendNotification(notification);
+         }
+         catch (Exception e)
+         {
+            HornetQClientLogger.LOGGER.errorSendingNotifOnDiscoveryStop(e);
+         }
+      }
+   }
+
+   public boolean isStarted()
+   {
+      return started;
+   }
+
+   public String getName()
+   {
+      return name;
+   }
+
+   public synchronized List<DiscoveryEntry> getDiscoveryEntries()
+   {
+      List<DiscoveryEntry> list = new ArrayList<DiscoveryEntry>(connectors.values());
+
+      return list;
+   }
+
+   public boolean waitForBroadcast(final long timeout)
+   {
+      synchronized (waitLock)
+      {
+         long start = System.currentTimeMillis();
+
+         long toWait = timeout;
+
+         while (started && !received && (toWait > 0 || timeout == 0))
+         {
+            try
+            {
+               waitLock.wait(toWait);
+            }
+            catch (InterruptedException e)
+            {
+               throw new HornetQInterruptedException(e);
+            }
+
+            if (timeout != 0)
+            {
+               long now = System.currentTimeMillis();
+
+               toWait -= now - start;
+
+               start = now;
+            }
+         }
+
+         boolean ret = received;
+
+         received = false;
+
+         return ret;
+      }
+   }
+
+   /*
+    * This is a sanity check to catch any cases where two different nodes are broadcasting the same node id either
+    * due to misconfiguration or problems in failover
+    */
+   private void checkUniqueID(final String originatingNodeID, final String uniqueID)
+   {
+      String currentUniqueID = uniqueIDMap.get(originatingNodeID);
+
+      if (currentUniqueID == null)
+      {
+         uniqueIDMap.put(originatingNodeID, uniqueID);
+      }
+      else
+      {
+         if (!currentUniqueID.equals(uniqueID))
+         {
+            HornetQClientLogger.LOGGER.multipleServersBroadcastingSameNode(originatingNodeID);
+            uniqueIDMap.put(originatingNodeID, uniqueID);
+         }
+      }
+   }
+
+   class DiscoveryRunnable implements Runnable
+   {
+      public void run()
+      {
+         try
+         {
+            byte[] data = null;
+
+            while (started)
+            {
+               try
+               {
+
+                  data = endpoint.receiveBroadcast();
+                  if (data == null)
+                  {
+                     if (started)
+                     {
+                        // This is totally unexpected, so I'm not even bothering on creating
+                        // a log entry for that
+                        HornetQClientLogger.LOGGER.warn("Unexpected null data received from DiscoveryEndpoint");
+                     }
+                     break;
+                  }
+               }
+               catch (Exception e)
+               {
+                  if (!started)
+                  {
+                     return;
+                  }
+                  else
+                  {
+                     HornetQClientLogger.LOGGER.errorReceivingPAcketInDiscovery(e);
+                  }
+               }
+
+               HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(data);
+
+               String originatingNodeID = buffer.readString();
+
+               String uniqueID = buffer.readString();
+
+               checkUniqueID(originatingNodeID, uniqueID);
+
+               if (nodeID.equals(originatingNodeID))
+               {
+                  if (checkExpiration())
+                  {
+                     callListeners();
+                  }
+                  // Ignore traffic from own node
+                  continue;
+               }
+
+               int size = buffer.readInt();
+
+               boolean changed = false;
+
+               DiscoveryEntry[] entriesRead = new DiscoveryEntry[size];
+               // Will first decode all the elements outside of any lock
+               for (int i = 0; i < size; i++)
+               {
+                  TransportConfiguration connector = new TransportConfiguration();
+
+                  connector.decode(buffer);
+
+                  entriesRead[i] = new DiscoveryEntry(originatingNodeID, connector, System.currentTimeMillis());
+               }
+
+               synchronized (DiscoveryGroup.this)
+               {
+                  for (DiscoveryEntry entry : entriesRead)
+                  {
+                     if (connectors.put(originatingNodeID, entry) == null)
+                     {
+                        changed = true;
+                     }
+                  }
+
+                  changed = changed || checkExpiration();
+               }
+               //only call the listeners if we have changed
+               //also make sure that we aren't stopping to avoid deadlock
+               if (changed && started)
+               {
+                  if (isTrace)
+                  {
+                     HornetQClientLogger.LOGGER.trace("Connectors changed on Discovery:");
+                     for (DiscoveryEntry connector : connectors.values())
+                     {
+                        HornetQClientLogger.LOGGER.trace(connector);
+                     }
+                  }
+                  callListeners();
+               }
+
+               synchronized (waitLock)
+               {
+                  received = true;
+
+                  waitLock.notifyAll();
+               }
+            }
+         }
+         catch (Exception e)
+         {
+            HornetQClientLogger.LOGGER.failedToReceiveDatagramInDiscovery(e);
+         }
+      }
+
+   }
+
+   public synchronized void registerListener(final DiscoveryListener listener)
+   {
+      listeners.add(listener);
+
+      if (!connectors.isEmpty())
+      {
+         listener.connectorsChanged(getDiscoveryEntries());
+      }
+   }
+
+   public synchronized void unregisterListener(final DiscoveryListener listener)
+   {
+      listeners.remove(listener);
+   }
+
+   private void callListeners()
+   {
+      for (DiscoveryListener listener : listeners)
+      {
+         try
+         {
+            listener.connectorsChanged(getDiscoveryEntries());
+         }
+         catch (Throwable t)
+         {
+            // Catch it so exception doesn't prevent other listeners from running
+            HornetQClientLogger.LOGGER.failedToCallListenerInDiscovery(t);
+         }
+      }
+   }
+
+   private boolean checkExpiration()
+   {
+      boolean changed = false;
+      long now = System.currentTimeMillis();
+
+      Iterator<Map.Entry<String, DiscoveryEntry>> iter = connectors.entrySet().iterator();
+
+      // Weed out any expired connectors
+
+      while (iter.hasNext())
+      {
+         Map.Entry<String, DiscoveryEntry> entry = iter.next();
+
+         if (entry.getValue().getLastUpdate() + timeout <= now)
+         {
+            if (isTrace)
+            {
+               HornetQClientLogger.LOGGER.trace("Timed out node on discovery:" + entry.getValue());
+            }
+            iter.remove();
+
+            changed = true;
+         }
+      }
+
+      return changed;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryListener.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryListener.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryListener.java
new file mode 100644
index 0000000..032ba68
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryListener.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.cluster;
+
+import java.util.List;
+
+/**
+ * To be called any time Discovery changes its list of nodes.
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author Clebert Suconic
+ *
+ * Created 17 Nov 2008 14:30:39
+ *
+ *
+ */
+public interface DiscoveryListener
+{
+   void connectorsChanged(List<DiscoveryEntry> newConnectors);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/exception/HornetQXAException.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/exception/HornetQXAException.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/exception/HornetQXAException.java
new file mode 100644
index 0000000..f72f2b2
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/exception/HornetQXAException.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.exception;
+
+import javax.transaction.xa.XAException;
+
+/**
+ * A HornetQXAException
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class HornetQXAException extends XAException
+{
+   private static final long serialVersionUID = 6535914602965015803L;
+
+   public HornetQXAException(final int errorCode, final String message)
+   {
+      super(message);
+
+      this.errorCode = errorCode;
+   }
+
+   public HornetQXAException(final int errorCode)
+   {
+      super(errorCode);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/BodyEncoder.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/BodyEncoder.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/BodyEncoder.java
new file mode 100644
index 0000000..923beb1
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/BodyEncoder.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.message;
+
+import java.nio.ByteBuffer;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQException;
+
+/**
+ * Class used to encode message body into buffers.
+ * <br>
+ * Used to send large streams over the wire
+ *
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ */
+public interface BodyEncoder
+{
+   /**
+    * This method must not be called directly by HornetQ clients.
+    */
+   void open() throws HornetQException;
+
+   /**
+    * This method must not be called directly by HornetQ clients.
+    */
+   void close() throws HornetQException;
+
+   /**
+    * This method must not be called directly by HornetQ clients.
+    */
+   int encode(ByteBuffer bufferRead) throws HornetQException;
+
+   /**
+    * This method must not be called directly by HornetQ clients.
+    */
+   int encode(HornetQBuffer bufferOut, int size) throws HornetQException;
+
+   /**
+    * This method must not be called directly by HornetQ clients.
+    */
+   long getLargeBodySize();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/impl/MessageImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/impl/MessageImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/impl/MessageImpl.java
new file mode 100644
index 0000000..b797f38
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/impl/MessageImpl.java
@@ -0,0 +1,1126 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.message.impl;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQBuffers;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQPropertyConversionException;
+import org.apache.activemq6.api.core.Message;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.core.buffers.impl.ResetLimitWrappedHornetQBuffer;
+import org.apache.activemq6.core.message.BodyEncoder;
+import org.apache.activemq6.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq6.utils.ByteUtil;
+import org.apache.activemq6.utils.DataConstants;
+import org.apache.activemq6.utils.TypedProperties;
+import org.apache.activemq6.utils.UUID;
+
+/**
+ * A concrete implementation of a message
+ * <p>
+ * All messages handled by HornetQ core are of this type
+ *
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ * @version <tt>$Revision: 2740 $</tt>
+ */
+public abstract class MessageImpl implements MessageInternal
+{
+   public static final SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_HQ_ROUTE_TO");
+
+   public static final SimpleString HDR_SCALEDOWN_TO_IDS = new SimpleString("_HQ_SCALEDOWN_TO");
+
+   public static final SimpleString HDR_ROUTE_TO_ACK_IDS = new SimpleString("_HQ_ACK_ROUTE_TO");
+
+   // used by the bridges to set duplicates
+   public static final SimpleString HDR_BRIDGE_DUPLICATE_ID = new SimpleString("_HQ_BRIDGE_DUP");
+
+   public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
+
+   public static final int BODY_OFFSET = BUFFER_HEADER_SPACE + DataConstants.SIZE_INT;
+
+   protected long messageID;
+
+   protected SimpleString address;
+
+   protected byte type;
+
+   protected boolean durable;
+
+   /**
+    * GMT milliseconds at which this message expires. 0 means never expires *
+    */
+   private long expiration;
+
+   protected long timestamp;
+
+   protected TypedProperties properties;
+
+   protected byte priority;
+
+   protected HornetQBuffer buffer;
+
+   protected ResetLimitWrappedHornetQBuffer bodyBuffer;
+
+   protected volatile boolean bufferValid;
+
+   private int endOfBodyPosition = -1;
+
+   private int endOfMessagePosition;
+
+   private boolean copied = true;
+
+   private boolean bufferUsed;
+
+   private UUID userID;
+
+   // Constructors --------------------------------------------------
+
+   protected MessageImpl()
+   {
+      properties = new TypedProperties();
+   }
+
+   /**
+    * overridden by the client message, we need access to the connection so we can create the appropriate HornetQBuffer.
+    *
+    * @param type
+    * @param durable
+    * @param expiration
+    * @param timestamp
+    * @param priority
+    * @param initialMessageBufferSize
+    */
+   protected MessageImpl(final byte type,
+                         final boolean durable,
+                         final long expiration,
+                         final long timestamp,
+                         final byte priority,
+                         final int initialMessageBufferSize)
+   {
+      this();
+      this.type = type;
+      this.durable = durable;
+      this.expiration = expiration;
+      this.timestamp = timestamp;
+      this.priority = priority;
+      createBody(initialMessageBufferSize);
+   }
+
+   protected MessageImpl(final int initialMessageBufferSize)
+   {
+      this();
+      createBody(initialMessageBufferSize);
+   }
+
+   /*
+    * Copy constructor
+    */
+   protected MessageImpl(final MessageImpl other)
+   {
+      this(other, other.getProperties());
+   }
+
+   /*
+    * Copy constructor
+    */
+   protected MessageImpl(final MessageImpl other, TypedProperties properties)
+   {
+      messageID = other.getMessageID();
+      userID = other.getUserID();
+      address = other.getAddress();
+      type = other.getType();
+      durable = other.isDurable();
+      expiration = other.getExpiration();
+      timestamp = other.getTimestamp();
+      priority = other.getPriority();
+      this.properties = new TypedProperties(properties);
+
+      // This MUST be synchronized using the monitor on the other message to prevent it running concurrently
+      // with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
+      // many subscriptions and bridging to other nodes in a cluster
+      synchronized (other)
+      {
+         bufferValid = other.bufferValid;
+         endOfBodyPosition = other.endOfBodyPosition;
+         endOfMessagePosition = other.endOfMessagePosition;
+         copied = other.copied;
+
+         if (other.buffer != null)
+         {
+            other.bufferUsed = true;
+
+            // We need to copy the underlying buffer too, since the different messsages thereafter might have different
+            // properties set on them, making their encoding different
+            buffer = other.buffer.copy(0, other.buffer.writerIndex());
+
+            buffer.setIndex(other.buffer.readerIndex(), buffer.capacity());
+         }
+      }
+   }
+
+   // Message implementation ----------------------------------------
+
+   public int getEncodeSize()
+   {
+      int headersPropsSize = getHeadersAndPropertiesEncodeSize();
+
+      int bodyPos = getEndOfBodyPosition();
+
+      int bodySize = bodyPos - BUFFER_HEADER_SPACE - DataConstants.SIZE_INT;
+
+      return DataConstants.SIZE_INT + bodySize + DataConstants.SIZE_INT + headersPropsSize;
+   }
+
+   public int getHeadersAndPropertiesEncodeSize()
+   {
+      return DataConstants.SIZE_LONG + // Message ID
+         DataConstants.SIZE_BYTE + // user id null?
+         (userID == null ? 0 : 16) +
+             /* address */SimpleString.sizeofNullableString(address) +
+         DataConstants./* Type */SIZE_BYTE +
+         DataConstants./* Durable */SIZE_BOOLEAN +
+         DataConstants./* Expiration */SIZE_LONG +
+         DataConstants./* Timestamp */SIZE_LONG +
+         DataConstants./* Priority */SIZE_BYTE +
+             /* PropertySize and Properties */properties.getEncodeSize();
+   }
+
+
+   public void encodeHeadersAndProperties(final HornetQBuffer buffer)
+   {
+      buffer.writeLong(messageID);
+      buffer.writeNullableSimpleString(address);
+      if (userID == null)
+      {
+         buffer.writeByte(DataConstants.NULL);
+      }
+      else
+      {
+         buffer.writeByte(DataConstants.NOT_NULL);
+         buffer.writeBytes(userID.asBytes());
+      }
+      buffer.writeByte(type);
+      buffer.writeBoolean(durable);
+      buffer.writeLong(expiration);
+      buffer.writeLong(timestamp);
+      buffer.writeByte(priority);
+      properties.encode(buffer);
+   }
+
+   public void decodeHeadersAndProperties(final HornetQBuffer buffer)
+   {
+      messageID = buffer.readLong();
+      address = buffer.readNullableSimpleString();
+      if (buffer.readByte() == DataConstants.NOT_NULL)
+      {
+         byte[] bytes = new byte[16];
+         buffer.readBytes(bytes);
+         userID = new UUID(UUID.TYPE_TIME_BASED, bytes);
+      }
+      else
+      {
+         userID = null;
+      }
+      type = buffer.readByte();
+      durable = buffer.readBoolean();
+      expiration = buffer.readLong();
+      timestamp = buffer.readLong();
+      priority = buffer.readByte();
+      properties.decode(buffer);
+   }
+
+   public void copyHeadersAndProperties(final MessageInternal msg)
+   {
+      messageID = msg.getMessageID();
+      address = msg.getAddress();
+      userID = msg.getUserID();
+      type = msg.getType();
+      durable = msg.isDurable();
+      expiration = msg.getExpiration();
+      timestamp = msg.getTimestamp();
+      priority = msg.getPriority();
+      properties = msg.getTypedProperties();
+   }
+
+   public HornetQBuffer getBodyBuffer()
+   {
+      if (bodyBuffer == null)
+      {
+         bodyBuffer = new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, buffer, this);
+      }
+
+      return bodyBuffer;
+   }
+
+   public Message writeBodyBufferBytes(byte[] bytes)
+   {
+      getBodyBuffer().writeBytes(bytes);
+
+      return this;
+   }
+
+   public Message writeBodyBufferString(String string)
+   {
+      getBodyBuffer().writeString(string);
+
+      return this;
+   }
+
+   public void checkCompletion() throws HornetQException
+   {
+      // no op on regular messages
+   }
+
+
+   public synchronized HornetQBuffer getBodyBufferCopy()
+   {
+      // Must copy buffer before sending it
+
+      HornetQBuffer newBuffer = buffer.copy(0, buffer.capacity());
+
+      newBuffer.setIndex(0, getEndOfBodyPosition());
+
+      return new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, newBuffer, null);
+   }
+
+   public long getMessageID()
+   {
+      return messageID;
+   }
+
+   public UUID getUserID()
+   {
+      return userID;
+   }
+
+   public MessageImpl setUserID(final UUID userID)
+   {
+      this.userID = userID;
+      return this;
+   }
+
+   /**
+    * this doesn't need to be synchronized as setAddress is protecting the buffer,
+    * not the address
+    */
+   public SimpleString getAddress()
+   {
+      return address;
+   }
+
+   /**
+    * The only reason this is synchronized is because of encoding a message versus invalidating the buffer.
+    * This synchronization can probably be removed since setAddress is always called from a single thread.
+    * However I will keep it as it's harmless and it's been well tested
+    */
+   public Message setAddress(final SimpleString address)
+   {
+      // This is protecting the buffer
+      synchronized (this)
+      {
+         if (this.address != address)
+         {
+            this.address = address;
+
+            bufferValid = false;
+         }
+      }
+
+      return this;
+   }
+
+   public byte getType()
+   {
+      return type;
+   }
+
+   public void setType(byte type)
+   {
+      this.type = type;
+   }
+
+   public boolean isDurable()
+   {
+      return durable;
+   }
+
+   public MessageImpl setDurable(final boolean durable)
+   {
+      if (this.durable != durable)
+      {
+         this.durable = durable;
+
+         bufferValid = false;
+      }
+      return this;
+   }
+
+   public long getExpiration()
+   {
+      return expiration;
+   }
+
+   public MessageImpl setExpiration(final long expiration)
+   {
+      if (this.expiration != expiration)
+      {
+         this.expiration = expiration;
+
+         bufferValid = false;
+      }
+      return this;
+   }
+
+   public long getTimestamp()
+   {
+      return timestamp;
+   }
+
+   public MessageImpl setTimestamp(final long timestamp)
+   {
+      if (this.timestamp != timestamp)
+      {
+         this.timestamp = timestamp;
+
+         bufferValid = false;
+      }
+      return this;
+   }
+
+   public byte getPriority()
+   {
+      return priority;
+   }
+
+   public MessageImpl setPriority(final byte priority)
+   {
+      if (this.priority != priority)
+      {
+         this.priority = priority;
+
+         bufferValid = false;
+      }
+      return this;
+   }
+
+   public boolean isExpired()
+   {
+      if (expiration == 0)
+      {
+         return false;
+      }
+
+      return System.currentTimeMillis() - expiration >= 0;
+   }
+
+   public Map<String, Object> toMap()
+   {
+      Map<String, Object> map = new HashMap<String, Object>();
+
+      map.put("messageID", messageID);
+      if (userID != null)
+      {
+         map.put("userID", "ID:" + userID.toString());
+      }
+      map.put("address", address.toString());
+      map.put("type", type);
+      map.put("durable", durable);
+      map.put("expiration", expiration);
+      map.put("timestamp", timestamp);
+      map.put("priority", priority);
+      for (SimpleString propName : properties.getPropertyNames())
+      {
+         map.put(propName.toString(), properties.getProperty(propName));
+      }
+      return map;
+   }
+
+   public void decodeFromBuffer(final HornetQBuffer buffer)
+   {
+      this.buffer = buffer;
+
+      decode();
+   }
+
+   public void bodyChanged()
+   {
+      // If the body is changed we must copy the buffer otherwise can affect the previously sent message
+      // which might be in the Netty write queue
+      checkCopy();
+
+      bufferValid = false;
+
+      endOfBodyPosition = -1;
+   }
+
+   public synchronized void checkCopy()
+   {
+      if (!copied)
+      {
+         forceCopy();
+
+         copied = true;
+      }
+   }
+
+   public synchronized void resetCopied()
+   {
+      copied = false;
+   }
+
+   public int getEndOfMessagePosition()
+   {
+      return endOfMessagePosition;
+   }
+
+   public int getEndOfBodyPosition()
+   {
+      if (endOfBodyPosition < 0)
+      {
+         endOfBodyPosition = buffer.writerIndex();
+      }
+      return endOfBodyPosition;
+   }
+
+   // Encode to journal or paging
+   public void encode(final HornetQBuffer buff)
+   {
+      encodeToBuffer();
+
+      buff.writeBytes(buffer, BUFFER_HEADER_SPACE, endOfMessagePosition - BUFFER_HEADER_SPACE);
+   }
+
+   // Decode from journal or paging
+   public void decode(final HornetQBuffer buff)
+   {
+      int start = buff.readerIndex();
+
+      endOfBodyPosition = buff.readInt();
+
+      endOfMessagePosition = buff.getInt(endOfBodyPosition - BUFFER_HEADER_SPACE + start);
+
+      int length = endOfMessagePosition - BUFFER_HEADER_SPACE;
+
+      buffer.setIndex(0, BUFFER_HEADER_SPACE);
+
+      buffer.writeBytes(buff, start, length);
+
+      decode();
+
+      buff.readerIndex(start + length);
+   }
+
+   public synchronized HornetQBuffer getEncodedBuffer()
+   {
+      HornetQBuffer buff = encodeToBuffer();
+
+      if (bufferUsed)
+      {
+         HornetQBuffer copied = buff.copy(0, buff.capacity());
+
+         copied.setIndex(0, endOfMessagePosition);
+
+         return copied;
+      }
+      else
+      {
+         buffer.setIndex(0, endOfMessagePosition);
+
+         bufferUsed = true;
+
+         return buffer;
+      }
+   }
+
+   public void setAddressTransient(final SimpleString address)
+   {
+      this.address = address;
+   }
+
+
+   // Properties
+   // ---------------------------------------------------------------------------------------
+
+   public Message putBooleanProperty(final SimpleString key, final boolean value)
+   {
+      properties.putBooleanProperty(key, value);
+
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putByteProperty(final SimpleString key, final byte value)
+   {
+      properties.putByteProperty(key, value);
+
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putBytesProperty(final SimpleString key, final byte[] value)
+   {
+      properties.putBytesProperty(key, value);
+
+      bufferValid = false;
+
+      return this;
+   }
+
+   @Override
+   public Message putCharProperty(SimpleString key, char value)
+   {
+      properties.putCharProperty(key, value);
+      bufferValid = false;
+
+      return this;
+   }
+
+   @Override
+   public Message putCharProperty(String key, char value)
+   {
+      properties.putCharProperty(new SimpleString(key), value);
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putShortProperty(final SimpleString key, final short value)
+   {
+      properties.putShortProperty(key, value);
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putIntProperty(final SimpleString key, final int value)
+   {
+      properties.putIntProperty(key, value);
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putLongProperty(final SimpleString key, final long value)
+   {
+      properties.putLongProperty(key, value);
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putFloatProperty(final SimpleString key, final float value)
+   {
+      properties.putFloatProperty(key, value);
+
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putDoubleProperty(final SimpleString key, final double value)
+   {
+      properties.putDoubleProperty(key, value);
+
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putStringProperty(final SimpleString key, final SimpleString value)
+   {
+      properties.putSimpleStringProperty(key, value);
+
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putObjectProperty(final SimpleString key, final Object value) throws HornetQPropertyConversionException
+   {
+      TypedProperties.setObjectProperty(key, value, properties);
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putObjectProperty(final String key, final Object value) throws HornetQPropertyConversionException
+   {
+      putObjectProperty(new SimpleString(key), value);
+
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putBooleanProperty(final String key, final boolean value)
+   {
+      properties.putBooleanProperty(new SimpleString(key), value);
+
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putByteProperty(final String key, final byte value)
+   {
+      properties.putByteProperty(new SimpleString(key), value);
+
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putBytesProperty(final String key, final byte[] value)
+   {
+      properties.putBytesProperty(new SimpleString(key), value);
+
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putShortProperty(final String key, final short value)
+   {
+      properties.putShortProperty(new SimpleString(key), value);
+
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putIntProperty(final String key, final int value)
+   {
+      properties.putIntProperty(new SimpleString(key), value);
+
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putLongProperty(final String key, final long value)
+   {
+      properties.putLongProperty(new SimpleString(key), value);
+
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putFloatProperty(final String key, final float value)
+   {
+      properties.putFloatProperty(new SimpleString(key), value);
+
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putDoubleProperty(final String key, final double value)
+   {
+      properties.putDoubleProperty(new SimpleString(key), value);
+
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putStringProperty(final String key, final String value)
+   {
+      properties.putSimpleStringProperty(new SimpleString(key), SimpleString.toSimpleString(value));
+
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Message putTypedProperties(final TypedProperties otherProps)
+   {
+      properties.putTypedProperties(otherProps);
+
+      bufferValid = false;
+
+      return this;
+   }
+
+   public Object getObjectProperty(final SimpleString key)
+   {
+      return properties.getProperty(key);
+   }
+
+   public Boolean getBooleanProperty(final SimpleString key) throws HornetQPropertyConversionException
+   {
+      return properties.getBooleanProperty(key);
+   }
+
+   public Boolean getBooleanProperty(final String key) throws HornetQPropertyConversionException
+   {
+      return properties.getBooleanProperty(new SimpleString(key));
+   }
+
+   public Byte getByteProperty(final SimpleString key) throws HornetQPropertyConversionException
+   {
+      return properties.getByteProperty(key);
+   }
+
+   public Byte getByteProperty(final String key) throws HornetQPropertyConversionException
+   {
+      return properties.getByteProperty(new SimpleString(key));
+   }
+
+   public byte[] getBytesProperty(final SimpleString key) throws HornetQPropertyConversionException
+   {
+      return properties.getBytesProperty(key);
+   }
+
+   public byte[] getBytesProperty(final String key) throws HornetQPropertyConversionException
+   {
+      return getBytesProperty(new SimpleString(key));
+   }
+
+   public Double getDoubleProperty(final SimpleString key) throws HornetQPropertyConversionException
+   {
+      return properties.getDoubleProperty(key);
+   }
+
+   public Double getDoubleProperty(final String key) throws HornetQPropertyConversionException
+   {
+      return properties.getDoubleProperty(new SimpleString(key));
+   }
+
+   public Integer getIntProperty(final SimpleString key) throws HornetQPropertyConversionException
+   {
+      return properties.getIntProperty(key);
+   }
+
+   public Integer getIntProperty(final String key) throws HornetQPropertyConversionException
+   {
+      return properties.getIntProperty(new SimpleString(key));
+   }
+
+   public Long getLongProperty(final SimpleString key) throws HornetQPropertyConversionException
+   {
+      return properties.getLongProperty(key);
+   }
+
+   public Long getLongProperty(final String key) throws HornetQPropertyConversionException
+   {
+      return properties.getLongProperty(new SimpleString(key));
+   }
+
+   public Short getShortProperty(final SimpleString key) throws HornetQPropertyConversionException
+   {
+      return properties.getShortProperty(key);
+   }
+
+   public Short getShortProperty(final String key) throws HornetQPropertyConversionException
+   {
+      return properties.getShortProperty(new SimpleString(key));
+   }
+
+   public Float getFloatProperty(final SimpleString key) throws HornetQPropertyConversionException
+   {
+      return properties.getFloatProperty(key);
+   }
+
+   public Float getFloatProperty(final String key) throws HornetQPropertyConversionException
+   {
+      return properties.getFloatProperty(new SimpleString(key));
+   }
+
+   public String getStringProperty(final SimpleString key) throws HornetQPropertyConversionException
+   {
+      SimpleString str = getSimpleStringProperty(key);
+
+      if (str == null)
+      {
+         return null;
+      }
+      else
+      {
+         return str.toString();
+      }
+   }
+
+   public String getStringProperty(final String key) throws HornetQPropertyConversionException
+   {
+      return getStringProperty(new SimpleString(key));
+   }
+
+   public SimpleString getSimpleStringProperty(final SimpleString key) throws HornetQPropertyConversionException
+   {
+      return properties.getSimpleStringProperty(key);
+   }
+
+   public SimpleString getSimpleStringProperty(final String key) throws HornetQPropertyConversionException
+   {
+      return properties.getSimpleStringProperty(new SimpleString(key));
+   }
+
+   public Object getObjectProperty(final String key)
+   {
+      return properties.getProperty(new SimpleString(key));
+   }
+
+   public Object removeProperty(final SimpleString key)
+   {
+      bufferValid = false;
+
+      return properties.removeProperty(key);
+   }
+
+   public Object removeProperty(final String key)
+   {
+      bufferValid = false;
+
+      return properties.removeProperty(new SimpleString(key));
+   }
+
+   public boolean containsProperty(final SimpleString key)
+   {
+      return properties.containsProperty(key);
+   }
+
+   public boolean containsProperty(final String key)
+   {
+      return properties.containsProperty(new SimpleString(key));
+   }
+
+   public Set<SimpleString> getPropertyNames()
+   {
+      return properties.getPropertyNames();
+   }
+
+   public HornetQBuffer getWholeBuffer()
+   {
+      return buffer;
+   }
+
+   public BodyEncoder getBodyEncoder() throws HornetQException
+   {
+      return new DecodingContext();
+   }
+
+   public TypedProperties getTypedProperties()
+   {
+      return this.properties;
+   }
+
+   @Override
+   public boolean equals(Object other)
+   {
+
+      if (this == other)
+      {
+         return true;
+      }
+
+      if (other instanceof MessageImpl)
+      {
+         MessageImpl message = (MessageImpl) other;
+
+         if (this.getMessageID() == message.getMessageID())
+            return true;
+      }
+
+      return false;
+   }
+
+   /**
+    * Debug Helper!!!!
+    *
+    * I'm leaving this message here without any callers for a reason:
+    * During debugs it's important eventually to identify what's on the bodies, and this method will give you a good idea about them.
+    * Add the message.bodyToString() to the Watch variables on the debugger view and this will show up like a charm!!!
+    * @return
+    */
+   public String bodyToString()
+   {
+      getEndOfBodyPosition();
+      int readerIndex1 = this.buffer.readerIndex();
+      buffer.readerIndex(0);
+      byte[] buffer1 = new byte[buffer.writerIndex()];
+      buffer.readBytes(buffer1);
+      buffer.readerIndex(readerIndex1);
+
+      byte[] buffer2 = null;
+      if (bodyBuffer != null)
+      {
+         int readerIndex2 = this.bodyBuffer.readerIndex();
+         bodyBuffer.readerIndex(0);
+         buffer2 = new byte[bodyBuffer.writerIndex() - bodyBuffer.readerIndex()];
+         bodyBuffer.readBytes(buffer2);
+         bodyBuffer.readerIndex(readerIndex2);
+      }
+
+      return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[" + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1) + ", bodyBuffer=" + ByteUtil.bytesToHex(buffer2, 1);
+   }
+
+
+
+
+   @Override
+   public int hashCode()
+   {
+      return 31 + (int)(messageID ^ (messageID >>> 32));
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   public TypedProperties getProperties()
+   {
+      return properties;
+   }
+
+   // This must be synchronized as it can be called concurrently id the message is being delivered
+   // concurrently to
+   // many queues - the first caller in this case will actually encode it
+   private synchronized HornetQBuffer encodeToBuffer()
+   {
+      if (!bufferValid)
+      {
+         if (bufferUsed)
+         {
+            // Cannot use same buffer - must copy
+
+            forceCopy();
+         }
+
+         int bodySize = getEndOfBodyPosition();
+
+         // Clebert: I've started sending this on encoding due to conversions between protocols
+         //          and making sure we are not losing the buffer start position between protocols
+         this.endOfBodyPosition = bodySize;
+
+         // write it
+         buffer.setInt(BUFFER_HEADER_SPACE, bodySize);
+
+         // Position at end of body and skip past the message end position int.
+         // check for enough room in the buffer even though it is dynamic
+         if ((bodySize + 4) > buffer.capacity())
+         {
+            buffer.setIndex(0, bodySize);
+            buffer.writeInt(0);
+         }
+         else
+         {
+            buffer.setIndex(0, bodySize + DataConstants.SIZE_INT);
+         }
+
+         encodeHeadersAndProperties(buffer);
+
+         // Write end of message position
+
+         endOfMessagePosition = buffer.writerIndex();
+
+         buffer.setInt(bodySize, endOfMessagePosition);
+
+         bufferValid = true;
+      }
+
+      return buffer;
+   }
+
+   private void decode()
+   {
+      endOfBodyPosition = buffer.getInt(BUFFER_HEADER_SPACE);
+
+      buffer.readerIndex(endOfBodyPosition + DataConstants.SIZE_INT);
+
+      decodeHeadersAndProperties(buffer);
+
+      endOfMessagePosition = buffer.readerIndex();
+
+      bufferValid = true;
+   }
+
+   public void createBody(final int initialMessageBufferSize)
+   {
+      buffer = HornetQBuffers.dynamicBuffer(initialMessageBufferSize);
+
+      // There's a bug in netty which means a dynamic buffer won't resize until you write a byte
+      buffer.writeByte((byte) 0);
+
+      buffer.setIndex(BODY_OFFSET, BODY_OFFSET);
+   }
+
+   private void forceCopy()
+   {
+      // Must copy buffer before sending it
+
+      buffer = buffer.copy(0, buffer.capacity());
+
+      buffer.setIndex(0, getEndOfBodyPosition());
+
+      if (bodyBuffer != null)
+      {
+         bodyBuffer.setBuffer(buffer);
+      }
+
+      bufferUsed = false;
+   }
+
+   // Inner classes -------------------------------------------------
+
+   private final class DecodingContext implements BodyEncoder
+   {
+      private int lastPos = 0;
+
+      public DecodingContext()
+      {
+      }
+
+      public void open()
+      {
+      }
+
+      public void close()
+      {
+      }
+
+      public long getLargeBodySize()
+      {
+         return buffer.writerIndex();
+      }
+
+      public int encode(final ByteBuffer bufferRead) throws HornetQException
+      {
+         HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bufferRead);
+         return encode(buffer, bufferRead.capacity());
+      }
+
+      public int encode(final HornetQBuffer bufferOut, final int size)
+      {
+         bufferOut.writeBytes(getWholeBuffer(), lastPos, size);
+         lastPos += size;
+         return size;
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/impl/MessageInternal.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/impl/MessageInternal.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/impl/MessageInternal.java
new file mode 100644
index 0000000..b341c25
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/impl/MessageInternal.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.message.impl;
+
+import java.io.InputStream;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.Message;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.core.message.BodyEncoder;
+import org.apache.activemq6.utils.TypedProperties;
+
+/**
+ * A MessageInternal
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public interface MessageInternal extends Message
+{
+   void decodeFromBuffer(HornetQBuffer buffer);
+
+   int getEndOfMessagePosition();
+
+   int getEndOfBodyPosition();
+
+   void checkCopy();
+
+   void bodyChanged();
+
+   void resetCopied();
+
+   boolean isServerMessage();
+
+   HornetQBuffer getEncodedBuffer();
+
+   int getHeadersAndPropertiesEncodeSize();
+
+   HornetQBuffer getWholeBuffer();
+
+   void encodeHeadersAndProperties(HornetQBuffer buffer);
+
+   void decodeHeadersAndProperties(HornetQBuffer buffer);
+
+   BodyEncoder getBodyEncoder() throws HornetQException;
+
+   InputStream getBodyInputStream();
+
+   void setAddressTransient(SimpleString address);
+
+   TypedProperties getTypedProperties();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/ClientPacketDecoder.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/ClientPacketDecoder.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/ClientPacketDecoder.java
new file mode 100644
index 0000000..0f2f1e7
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/ClientPacketDecoder.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.protocol;
+
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.client.impl.ClientLargeMessageImpl;
+import org.apache.activemq6.core.client.impl.ClientMessageImpl;
+import org.apache.activemq6.core.protocol.core.Packet;
+import org.apache.activemq6.core.protocol.core.impl.PacketDecoder;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionReceiveClientLargeMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionReceiveMessage;
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *         10/12/12
+ */
+public class ClientPacketDecoder extends PacketDecoder
+{
+   private static final long serialVersionUID = 6952614096979334582L;
+   public static final ClientPacketDecoder INSTANCE = new ClientPacketDecoder();
+
+   @Override
+   public  Packet decode(final HornetQBuffer in)
+   {
+      final byte packetType = in.readByte();
+
+      Packet packet = decode(packetType);
+
+      packet.decode(in);
+
+      return packet;
+   }
+
+   @Override
+   public Packet decode(byte packetType)
+   {
+      Packet packet;
+
+      switch (packetType)
+      {
+         case SESS_RECEIVE_MSG:
+         {
+            packet = new SessionReceiveMessage(new ClientMessageImpl());
+            break;
+         }
+         case SESS_RECEIVE_LARGE_MSG:
+         {
+            packet = new SessionReceiveClientLargeMessage(new ClientLargeMessageImpl());
+            break;
+         }
+         default:
+         {
+            packet = super.decode(packetType);
+         }
+      }
+      return packet;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/Channel.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/Channel.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/Channel.java
new file mode 100644
index 0000000..4c82ef2
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/Channel.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core;
+
+import java.util.concurrent.locks.Lock;
+
+import org.apache.activemq6.api.core.HornetQException;
+
+/**
+ * A channel is a way of interleaving data meant for different endpoints over the same {@link org.apache.activemq6.core.protocol.core.CoreRemotingConnection}.
+ * <p>
+ * Any packet sent will have its channel id set to the specific channel sending so it can be routed to its correct channel
+ * when received by the {@link org.apache.activemq6.core.protocol.core.CoreRemotingConnection}. see {@link org.hornetq.core.protocol.core.Packet#setChannelID(long)}.
+ * <p>
+ * Each Channel should will forward any packets received to its {@link org.apache.activemq6.core.protocol.core.ChannelHandler}.
+ * <p>
+ * A Channel *does not* support concurrent access by more than one thread!
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public interface Channel
+{
+   /**
+    * Returns the id of this channel.
+    * @return the id
+    */
+   long getID();
+
+   /** For protocol check */
+   boolean supports(byte packetID);
+
+   /**
+    * Sends a packet on this channel.
+    * @param packet the packet to send
+    * @return false if the packet was rejected by an outgoing interceptor; true if the send was
+    *         successful
+    */
+   boolean send(Packet packet);
+
+   /**
+    * Sends a packet on this channel using batching algorithm if appropriate
+    * @param packet the packet to send
+    * @return false if the packet was rejected by an outgoing interceptor; true if the send was
+    *         successful
+    */
+   boolean sendBatched(Packet packet);
+
+   /**
+    * Sends a packet on this channel and then blocks until it has been written to the connection.
+    * @param packet the packet to send
+    * @return false if the packet was rejected by an outgoing interceptor; true if the send was
+    *         successful
+    */
+   boolean sendAndFlush(Packet packet);
+
+   /**
+    * Sends a packet on this channel and then blocks until a response is received or a timeout
+    * occurs.
+    * @param packet the packet to send
+    * @param expectedPacket the packet being expected.
+    * @return the response
+    * @throws HornetQException if an error occurs during the send
+    */
+   Packet sendBlocking(Packet packet, byte expectedPacket) throws HornetQException;
+
+   /**
+    * Sets the {@link org.apache.activemq6.core.protocol.core.ChannelHandler} that this channel should
+    * forward received packets to.
+    * @param handler the handler
+    */
+   void setHandler(ChannelHandler handler);
+
+   /**
+    * Gets the {@link org.apache.activemq6.core.protocol.core.ChannelHandler} that this channel should
+    * forward received packets to.
+    * @return the current channel handler
+    */
+   ChannelHandler getHandler();
+
+   /**
+    * Closes this channel.
+    * <p>
+    * once closed no packets can be sent.
+    */
+   void close();
+
+   /**
+    * Transfers the connection used by this channel to the one specified.
+    * <p>
+    * All new packets will be sent via this connection.
+    * @param newConnection the new connection
+    */
+   void transferConnection(CoreRemotingConnection newConnection);
+
+   /**
+    * resends any packets that have not received confirmations yet.
+    * <p>
+    * Typically called after a connection has been transferred.
+    *
+    * @param lastConfirmedCommandID the last confirmed packet
+    */
+   void replayCommands(int lastConfirmedCommandID);
+
+   /**
+    * returns the last confirmed packet command id
+    *
+    * @return the id
+    */
+   int getLastConfirmedCommandID();
+
+   /**
+    * locks the channel.
+    * <p>
+    * While locked no packets can be sent or received
+    */
+   void lock();
+
+   /**
+    * unlocks the channel.
+    */
+   void unlock();
+
+   /**
+    * forces any {@link org.apache.activemq6.core.protocol.core.Channel#sendBlocking(Packet, byte)} request to return with an exception.
+    */
+   void returnBlocking();
+
+   /**
+    * forces any {@link org.apache.activemq6.core.protocol.core.Channel#sendBlocking(Packet, byte)} request to return with an exception.
+    */
+   void returnBlocking(Throwable cause);
+
+   /**
+    * returns the channel lock
+    *
+    * @return the lock
+    */
+   Lock getLock();
+
+   /**
+    * returns the {@link CoreRemotingConnection} being used by the channel
+    */
+   CoreRemotingConnection getConnection();
+
+   /**
+    * sends a confirmation of a packet being received.
+    *
+    * @param packet the packet to confirm
+    */
+   void confirm(Packet packet);
+
+   /**
+    * sets the handler to use when a confirmation is received.
+    *
+    * @param handler the handler to call
+    */
+   void setCommandConfirmationHandler(CommandConfirmationHandler handler);
+
+   /**
+    * flushes any confirmations on to the connection.
+    */
+   void flushConfirmations();
+
+   /**
+    * Called by {@link org.apache.activemq6.core.protocol.core.CoreRemotingConnection} when a packet is received.
+    * <p>
+    * This method should then call its {@link org.apache.activemq6.core.protocol.core.ChannelHandler} after appropriate processing of
+    * the packet
+    *
+    * @param packet the packet to process.
+    */
+   void handlePacket(Packet packet);
+
+   /**
+    * clears any commands from the cache that are yet to be confirmed.
+    */
+   void clearCommands();
+
+   /**
+    * returns the confirmation window size this channel is using.
+    *
+    * @return the window size
+    */
+   int getConfirmationWindowSize();
+
+   /**
+    * notifies the channel if it is transferring its connection. When true it is illegal to send messages.
+    *
+    * @param transferring whether the channel is transferring
+    */
+   void setTransferring(boolean transferring);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/ChannelHandler.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/ChannelHandler.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/ChannelHandler.java
new file mode 100644
index 0000000..0fce6da
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/ChannelHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core;
+
+
+/**
+ * A ChannelHandler is used by {@link Channel}. When a channel receives a packet it will call its handler to deal with the
+ * packet.
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public interface ChannelHandler
+{
+   /**
+    * called by the channel when a packet is received..
+    *
+    * @param packet the packet received
+    */
+   void handlePacket(Packet packet);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/CommandConfirmationHandler.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/CommandConfirmationHandler.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/CommandConfirmationHandler.java
new file mode 100644
index 0000000..fe7fda9
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/CommandConfirmationHandler.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core;
+
+
+/**
+ * A CommandConfirmationHandler is used by the channel to confirm confirmations of packets.
+ * <p>
+ * Created 9 Feb 2009 12:39:11
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public interface CommandConfirmationHandler
+{
+   /**
+    * called by channel after a confirmation has been received.
+    *
+    * @param packet the packet confirmed
+    */
+   void commandConfirmed(Packet packet);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/CoreRemotingConnection.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/CoreRemotingConnection.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/CoreRemotingConnection.java
new file mode 100644
index 0000000..513aa70
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/CoreRemotingConnection.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core;
+
+import org.apache.activemq6.core.security.HornetQPrincipal;
+import org.apache.activemq6.spi.core.protocol.RemotingConnection;
+
+
+/**
+ * Extension of RemotingConnection for the HornetQ core protocol
+ * @author Tim Fox
+ */
+public interface CoreRemotingConnection extends RemotingConnection
+{
+
+   /** The client protocol used  on the communication.
+    *  This will determine if the client has support for certain packet types */
+   int getClientVersion();
+
+   /**
+    * Sets the client protocol used on the communication. This will determine if the client has
+    * support for certain packet types
+    */
+   void setClientVersion(int clientVersion);
+
+   /**
+    * Returns the channel with the channel id specified.
+    * <p>
+    * If it does not exist create it with the confirmation window size.
+    * @param channelID the channel id
+    * @param confWindowSize the confirmation window size
+    * @return the channel
+    */
+   Channel getChannel(long channelID, int confWindowSize);
+
+   /**
+    * add the channel with the specified channel id
+    *
+    * @param channelID the channel id
+    * @param channel   the channel
+    */
+   void putChannel(long channelID, Channel channel);
+
+   /**
+    * remove the channel with the specified channel id
+    *
+    * @param channelID the channel id
+    * @return true if removed
+    */
+   boolean removeChannel(long channelID);
+
+   /**
+    * generate a unique (within this connection) channel id
+    *
+    * @return the id
+    */
+   long generateChannelID();
+
+   /**
+    * Resets the id generator used to generate id's.
+    * @param id the first id to set it to
+    */
+   void syncIDGeneratorSequence(long id);
+
+   /**
+    * Returns the next id to be chosen.
+    * @return the id
+    */
+   long getIDGeneratorSequence();
+
+   /**
+    * Returns the current timeout for blocking calls
+    * @return the timeout in milliseconds
+    */
+   long getBlockingCallTimeout();
+
+   /**
+    * Returns the current timeout for blocking calls
+    * @return the timeout in milliseconds
+    */
+   long getBlockingCallFailoverTimeout();
+
+   /**
+    * Returns the transfer lock used when transferring connections.
+    * @return the lock
+    */
+   Object getTransferLock();
+
+   /**
+    * Returns the default security principal
+    * @return the principal
+    */
+   HornetQPrincipal getDefaultHornetQPrincipal();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/Packet.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/Packet.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/Packet.java
new file mode 100644
index 0000000..6fb6359
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/Packet.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.spi.core.protocol.RemotingConnection;
+
+/**
+ * A Packet represents a packet of data transmitted over a connection.
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public interface Packet
+{
+   /**
+    * Sets the channel id that should be used once the packet has been successfully decoded it is
+    * sent to the correct channel.
+    *
+    * @param channelID the id of the channel to handle the packet
+    */
+   void setChannelID(long channelID);
+
+   /**
+    * Returns the channel id of the channel that should handle this packet.
+    *
+    * @return the id of the channel
+    */
+   long getChannelID();
+
+   /**
+    * returns true if this packet is being sent in response to a previously received packet
+    *
+    * @return true if a response
+    */
+   boolean isResponse();
+
+   /**
+    * returns the type of the packet.
+    * <p>
+    * This is needed when decoding the packet
+    *
+    * @return the packet type
+    */
+   byte getType();
+
+   /**
+    * Encodes the packet and returns a {@link org.apache.activemq6.api.core.HornetQBuffer} containing the data
+    *
+    * @param connection the connection
+    * @return the buffer to encode to
+    */
+   HornetQBuffer encode(RemotingConnection connection);
+
+   /**
+    * decodes the buffer into this packet
+    *
+    * @param buffer the buffer to decode from
+    */
+   void decode(HornetQBuffer buffer);
+
+   /**
+    * returns the size needed to encode this packet.
+    *
+    * @return The size of the entire packet including headers, and extra data
+    */
+   int getPacketSize();
+
+   /**
+    * returns true if a confirmation should be sent on receipt of this packet.
+    *
+    * @return true if confirmation is required
+    */
+   boolean isRequiresConfirmations();
+
+   boolean isAsyncExec();
+}


Mime
View raw message