activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [34/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master
Date Tue, 11 Nov 2014 11:01:04 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireConnection.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireConnection.java
new file mode 100644
index 0000000..78aae2f
--- /dev/null
+++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireConnection.java
@@ -0,0 +1,1765 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.jms.JMSSecurityException;
+import javax.jms.ResourceAllocationException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionControl;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ControlCommand;
+import org.apache.activemq.command.DataArrayResponse;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.FlushCommand;
+import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.state.ConnectionState;
+import org.apache.activemq.state.ConsumerState;
+import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.state.SessionState;
+import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.transport.TransmitCallback;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.HornetQSecurityException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.protocol.openwire.amq.AMQBrokerStoppedException;
+import org.hornetq.core.protocol.openwire.amq.AMQConnectionContext;
+import org.hornetq.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
+import org.hornetq.core.protocol.openwire.amq.AMQMapTransportConnectionStateRegister;
+import org.hornetq.core.protocol.openwire.amq.AMQMessageAuthorizationPolicy;
+import org.hornetq.core.protocol.openwire.amq.AMQProducerBrokerExchange;
+import org.hornetq.core.protocol.openwire.amq.AMQSession;
+import org.hornetq.core.protocol.openwire.amq.AMQSingleTransportConnectionStateRegister;
+import org.hornetq.core.protocol.openwire.amq.AMQTransaction;
+import org.hornetq.core.protocol.openwire.amq.AMQTransportConnectionState;
+import org.hornetq.core.protocol.openwire.amq.AMQTransportConnectionStateRegister;
+import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.server.HornetQServerLogger;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Acceptor;
+import org.hornetq.spi.core.remoting.Connection;
+import org.hornetq.utils.ConcurrentHashSet;
+
+/**
+ * Represents an activemq connection.
+ * @author howard
+ *
+ */
+public class OpenWireConnection implements RemotingConnection, CommandVisitor
+{
+   private final OpenWireProtocolManager protocolManager;
+
+   private final Connection transportConnection;
+
+   private final AMQConnectorImpl acceptorUsed;
+
+   private final long creationTime;
+
+   private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
+
+   private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<CloseListener>();
+
+   private boolean destroyed = false;
+
+   private final Object sendLock = new Object();
+
+   private boolean dataReceived;
+
+   private OpenWireFormat wireFormat;
+
+   private AMQTransportConnectionStateRegister connectionStateRegister = new AMQSingleTransportConnectionStateRegister();
+
+   private boolean faultTolerantConnection;
+
+   private AMQConnectionContext context;
+
+   private AMQMessageAuthorizationPolicy messageAuthorizationPolicy;
+
+   private boolean networkConnection;
+
+   private boolean manageable;
+
+   private boolean pendingStop;
+
+   private Throwable stopError = null;
+
+   // should come from hornetq server
+   private final TaskRunnerFactory stopTaskRunnerFactory = null;
+
+   private boolean starting;
+
+   private final AtomicBoolean stopping = new AtomicBoolean(false);
+
+   private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
+
+   private final CountDownLatch stopped = new CountDownLatch(1);
+
+   protected TaskRunner taskRunner;
+
+   private boolean active;
+
+   protected final List<Command> dispatchQueue = new LinkedList<Command>();
+
+   private boolean markedCandidate;
+
+   private boolean blockedCandidate;
+
+   private long timeStamp;
+
+   private boolean inServiceException;
+
+   private final AtomicBoolean asyncException = new AtomicBoolean(false);
+
+   private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, AMQConsumerBrokerExchange>();
+   private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, AMQProducerBrokerExchange>();
+
+   private AMQTransportConnectionState state;
+
+   private final Set<String> tempQueues = new ConcurrentHashSet<String>();
+
+   protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
+
+   private DataInputWrapper dataInput = new DataInputWrapper();
+
+   private Map<TransactionId, TransactionInfo> txMap = new ConcurrentHashMap<TransactionId, TransactionInfo>();
+
+   private volatile AMQSession advisorySession;
+
+   public OpenWireConnection(Acceptor acceptorUsed, Connection connection,
+         OpenWireProtocolManager openWireProtocolManager, OpenWireFormat wf)
+   {
+      this.protocolManager = openWireProtocolManager;
+      this.transportConnection = connection;
+      this.acceptorUsed = new AMQConnectorImpl(acceptorUsed);
+      this.wireFormat = wf;
+      brokerConnectionStates = protocolManager.getConnectionStates();
+      this.creationTime = System.currentTimeMillis();
+   }
+
+   @Override
+   public void bufferReceived(Object connectionID, HornetQBuffer buffer)
+   {
+      try
+      {
+         dataInput.receiveData(buffer);
+      }
+      catch (Throwable t)
+      {
+         HornetQServerLogger.LOGGER.error("decoding error", t);
+         return;
+      }
+
+      // this.setDataReceived();
+      while (dataInput.readable())
+      {
+         try
+         {
+            Object object = null;
+            try
+            {
+               object = wireFormat.unmarshal(dataInput);
+               dataInput.mark();
+            }
+            catch (NotEnoughBytesException e)
+            {
+               //meaning the dataInput hasn't enough bytes for a command.
+               //in that case we just return and waiting for the next
+               //call of bufferReceived()
+               return;
+            }
+
+            Command command = (Command) object;
+            boolean responseRequired = command.isResponseRequired();
+            int commandId = command.getCommandId();
+            // the connection handles pings, negotiations directly.
+            // and delegate all other commands to manager.
+            if (command.getClass() == KeepAliveInfo.class)
+            {
+               KeepAliveInfo info = (KeepAliveInfo) command;
+               if (info.isResponseRequired())
+               {
+                  info.setResponseRequired(false);
+                  protocolManager.sendReply(this, info);
+               }
+            }
+            else if (command.getClass() == WireFormatInfo.class)
+            {
+               // amq here starts a read/write monitor thread (detect ttl?)
+               negotiate((WireFormatInfo) command);
+            }
+            else if (command.getClass() == ConnectionInfo.class
+                  || command.getClass() == ConsumerInfo.class
+                  || command.getClass() == RemoveInfo.class
+                  || command.getClass() == SessionInfo.class
+                  || command.getClass() == ProducerInfo.class
+                  || ActiveMQMessage.class.isAssignableFrom(command.getClass())
+                  || command.getClass() == MessageAck.class
+                  || command.getClass() == TransactionInfo.class
+                  || command.getClass() == DestinationInfo.class
+                  || command.getClass() == ShutdownInfo.class)
+            {
+               Response response = null;
+
+               if (pendingStop)
+               {
+                  response = new ExceptionResponse(this.stopError);
+               }
+               else
+               {
+                  response = ((Command) command).visit(this);
+
+                  if (response instanceof ExceptionResponse)
+                  {
+                     if (!responseRequired)
+                     {
+                        Throwable cause = ((ExceptionResponse)response).getException();
+                        serviceException(cause);
+                        response = null;
+                     }
+                  }
+               }
+
+               if (responseRequired)
+               {
+                  if (response == null)
+                  {
+                     response = new Response();
+                  }
+               }
+
+               // The context may have been flagged so that the response is not
+               // sent.
+               if (context != null)
+               {
+                  if (context.isDontSendReponse())
+                  {
+                     context.setDontSendReponse(false);
+                     response = null;
+                  }
+                  context = null;
+               }
+
+               if (response != null && !protocolManager.isStopping())
+               {
+                  response.setCorrelationId(commandId);
+                  dispatchSync(response);
+               }
+
+            }
+            else
+            {
+               // note!!! wait for negotiation (e.g. use a countdown latch)
+               // before handling any other commands
+               this.protocolManager.handleCommand(this, command);
+            }
+         }
+         catch (IOException e)
+         {
+            HornetQServerLogger.LOGGER.error("error decoding", e);
+         }
+         catch (Throwable t)
+         {
+            HornetQServerLogger.LOGGER.error("error decoding", t);
+         }
+      }
+   }
+
+   private void negotiate(WireFormatInfo command) throws IOException
+   {
+      this.wireFormat.renegotiateWireFormat(command);
+   }
+
+   @Override
+   public Object getID()
+   {
+      return transportConnection.getID();
+   }
+
+   @Override
+   public long getCreationTime()
+   {
+      return creationTime;
+   }
+
+   @Override
+   public String getRemoteAddress()
+   {
+      return transportConnection.getRemoteAddress();
+   }
+
+   @Override
+   public void addFailureListener(FailureListener listener)
+   {
+      if (listener == null)
+      {
+         throw new IllegalStateException("FailureListener cannot be null");
+      }
+
+      failureListeners.add(listener);
+   }
+
+   @Override
+   public boolean removeFailureListener(FailureListener listener)
+   {
+      if (listener == null)
+      {
+         throw new IllegalStateException("FailureListener cannot be null");
+      }
+
+      return failureListeners.remove(listener);
+   }
+
+   @Override
+   public void addCloseListener(CloseListener listener)
+   {
+      if (listener == null)
+      {
+         throw new IllegalStateException("CloseListener cannot be null");
+      }
+
+      closeListeners.add(listener);
+   }
+
+   @Override
+   public boolean removeCloseListener(CloseListener listener)
+   {
+      if (listener == null)
+      {
+         throw new IllegalStateException("CloseListener cannot be null");
+      }
+
+      return closeListeners.remove(listener);
+   }
+
+   @Override
+   public List<CloseListener> removeCloseListeners()
+   {
+      List<CloseListener> ret = new ArrayList<CloseListener>(closeListeners);
+
+      closeListeners.clear();
+
+      return ret;
+   }
+
+   @Override
+   public void setCloseListeners(List<CloseListener> listeners)
+   {
+      closeListeners.clear();
+
+      closeListeners.addAll(listeners);
+   }
+
+   @Override
+   public List<FailureListener> getFailureListeners()
+   {
+      // we do not return the listeners otherwise the remoting service
+      // would NOT destroy the connection.
+      return Collections.emptyList();
+   }
+
+   @Override
+   public List<FailureListener> removeFailureListeners()
+   {
+      List<FailureListener> ret = new ArrayList<FailureListener>(
+            failureListeners);
+
+      failureListeners.clear();
+
+      return ret;
+   }
+
+   @Override
+   public void setFailureListeners(List<FailureListener> listeners)
+   {
+      failureListeners.clear();
+
+      failureListeners.addAll(listeners);
+   }
+
+   @Override
+   public HornetQBuffer createBuffer(int size)
+   {
+      return HornetQBuffers.dynamicBuffer(size);
+   }
+
+   @Override
+   public void fail(HornetQException me)
+   {
+      HornetQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(),
+            me.getType());
+      // Then call the listeners
+      callFailureListeners(me);
+
+      callClosingListeners();
+
+      destroyed = true;
+
+      transportConnection.close();
+   }
+
+   @Override
+   public void destroy()
+   {
+      destroyed = true;
+
+      transportConnection.close();
+
+      try
+      {
+         deleteTempQueues();
+      }
+      catch (Exception e)
+      {
+         //log warning
+      }
+
+      synchronized (sendLock)
+      {
+         callClosingListeners();
+      }
+   }
+
+   private void deleteTempQueues() throws Exception
+   {
+      Iterator<String> queueNames = tempQueues.iterator();
+      while (queueNames.hasNext())
+      {
+         String q = queueNames.next();
+         protocolManager.deleteQueue(q);
+      }
+   }
+
+   @Override
+   public Connection getTransportConnection()
+   {
+      return this.transportConnection;
+   }
+
+   @Override
+   public boolean isClient()
+   {
+      return false;
+   }
+
+   @Override
+   public boolean isDestroyed()
+   {
+      return destroyed;
+   }
+
+   @Override
+   public void disconnect(boolean criticalError)
+   {
+      fail(null);
+   }
+
+   @Override
+   public boolean checkDataReceived()
+   {
+      boolean res = dataReceived;
+
+      dataReceived = false;
+
+      return res;
+   }
+
+   @Override
+   public void flush()
+   {
+   }
+
+   private void callFailureListeners(final HornetQException me)
+   {
+      final List<FailureListener> listenersClone = new ArrayList<FailureListener>(
+            failureListeners);
+
+      for (final FailureListener listener : listenersClone)
+      {
+         try
+         {
+            listener.connectionFailed(me, false);
+         }
+         catch (final Throwable t)
+         {
+            // Failure of one listener to execute shouldn't prevent others
+            // from
+            // executing
+            HornetQServerLogger.LOGGER.errorCallingFailureListener(t);
+         }
+      }
+   }
+
+   private void callClosingListeners()
+   {
+      final List<CloseListener> listenersClone = new ArrayList<CloseListener>(
+            closeListeners);
+
+      for (final CloseListener listener : listenersClone)
+      {
+         try
+         {
+            listener.connectionClosed();
+         }
+         catch (final Throwable t)
+         {
+            // Failure of one listener to execute shouldn't prevent others
+            // from
+            // executing
+            HornetQServerLogger.LOGGER.errorCallingFailureListener(t);
+         }
+      }
+   }
+
+   // throw a WireFormatInfo to the peer
+   public void init()
+   {
+      WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
+      protocolManager.send(this, info);
+   }
+
+   public ConnectionState getState()
+   {
+      return state;
+   }
+
+   public void physicalSend(Command command) throws IOException
+   {
+      try
+      {
+         ByteSequence bytes = wireFormat.marshal(command);
+         HornetQBuffer buffer = OpenWireUtil.toHornetQBuffer(bytes);
+         synchronized (sendLock)
+         {
+            getTransportConnection().write(buffer, false, false);
+         }
+      }
+      catch (IOException e)
+      {
+         throw e;
+      }
+      catch (Throwable t)
+      {
+         HornetQServerLogger.LOGGER.error("error sending", t);
+      }
+
+   }
+
+   @Override
+   public Response processAddConnection(ConnectionInfo info) throws Exception
+   {
+      WireFormatInfo wireFormatInfo = wireFormat.getPreferedWireFormatInfo();
+      // Older clients should have been defaulting this field to true.. but
+      // they were not.
+      if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2)
+      {
+         info.setClientMaster(true);
+      }
+
+      // Make sure 2 concurrent connections by the same ID only generate 1
+      // TransportConnectionState object.
+      synchronized (brokerConnectionStates)
+      {
+         state = (AMQTransportConnectionState) brokerConnectionStates.get(info
+               .getConnectionId());
+         if (state == null)
+         {
+            state = new AMQTransportConnectionState(info, this);
+            brokerConnectionStates.put(info.getConnectionId(), state);
+         }
+         state.incrementReference();
+      }
+      // If there are 2 concurrent connections for the same connection id,
+      // then last one in wins, we need to sync here
+      // to figure out the winner.
+      synchronized (state.getConnectionMutex())
+      {
+         if (state.getConnection() != this)
+         {
+            state.getConnection().disconnect(true);
+            state.setConnection(this);
+            state.reset(info);
+         }
+      }
+
+      registerConnectionState(info.getConnectionId(), state);
+
+      this.faultTolerantConnection = info.isFaultTolerant();
+      // Setup the context.
+      String clientId = info.getClientId();
+      context = new AMQConnectionContext();
+      context.setBroker(protocolManager);
+      context.setClientId(clientId);
+      context.setClientMaster(info.isClientMaster());
+      context.setConnection(this);
+      context.setConnectionId(info.getConnectionId());
+      // for now we pass the manager as the connector and see what happens
+      // it should be related to hornetq's Acceptor
+      context.setConnector(this.acceptorUsed);
+      context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
+      context.setNetworkConnection(networkConnection);
+      context.setFaultTolerant(faultTolerantConnection);
+      context
+            .setTransactions(new ConcurrentHashMap<TransactionId, AMQTransaction>());
+      context.setUserName(info.getUserName());
+      context.setWireFormatInfo(wireFormatInfo);
+      context.setReconnect(info.isFailoverReconnect());
+      this.manageable = info.isManageable();
+      context.setConnectionState(state);
+      state.setContext(context);
+      state.setConnection(this);
+      if (info.getClientIp() == null)
+      {
+         info.setClientIp(getRemoteAddress());
+      }
+
+      try
+      {
+         protocolManager.addConnection(context, info);
+      }
+      catch (Exception e)
+      {
+         synchronized (brokerConnectionStates)
+         {
+            brokerConnectionStates.remove(info.getConnectionId());
+         }
+         unregisterConnectionState(info.getConnectionId());
+
+         if (e instanceof SecurityException)
+         {
+            // close this down - in case the peer of this transport doesn't play
+            // nice
+            delayedStop(2000,
+                  "Failed with SecurityException: " + e.getLocalizedMessage(),
+                  e);
+         }
+         Response resp = new ExceptionResponse(e);
+         return resp;
+      }
+      if (info.isManageable())
+      {
+         // send ConnectionCommand
+         ConnectionControl command = this.acceptorUsed.getConnectionControl();
+         command.setFaultTolerant(protocolManager
+               .isFaultTolerantConfiguration());
+         if (info.isFailoverReconnect())
+         {
+            command.setRebalanceConnection(false);
+         }
+         dispatchAsync(command);
+      }
+      return null;
+   }
+
+   public void dispatchAsync(Command message)
+   {
+      if (!stopping.get())
+      {
+         if (taskRunner == null)
+         {
+            dispatchSync(message);
+         }
+         else
+         {
+            synchronized (dispatchQueue)
+            {
+               dispatchQueue.add(message);
+            }
+            try
+            {
+               taskRunner.wakeup();
+            }
+            catch (InterruptedException e)
+            {
+               Thread.currentThread().interrupt();
+            }
+         }
+      }
+      else
+      {
+         if (message.isMessageDispatch())
+         {
+            MessageDispatch md = (MessageDispatch) message;
+            TransmitCallback sub = md.getTransmitCallback();
+            protocolManager.postProcessDispatch(md);
+            if (sub != null)
+            {
+               sub.onFailure();
+            }
+         }
+      }
+   }
+
+   public void dispatchSync(Command message)
+   {
+      try
+      {
+         processDispatch(message);
+      }
+      catch (IOException e)
+      {
+         serviceExceptionAsync(e);
+      }
+   }
+
+   public void serviceExceptionAsync(final IOException e)
+   {
+      if (asyncException.compareAndSet(false, true))
+      {
+         new Thread("Async Exception Handler")
+         {
+            @Override
+            public void run()
+            {
+               serviceException(e);
+            }
+         }.start();
+      }
+   }
+
+   public void serviceException(Throwable e)
+   {
+      // are we a transport exception such as not being able to dispatch
+      // synchronously to a transport
+      if (e instanceof IOException)
+      {
+         serviceTransportException((IOException) e);
+      }
+      else if (e.getClass() == AMQBrokerStoppedException.class)
+      {
+         // Handle the case where the broker is stopped
+         // But the client is still connected.
+         if (!stopping.get())
+         {
+            ConnectionError ce = new ConnectionError();
+            ce.setException(e);
+            dispatchSync(ce);
+            // Record the error that caused the transport to stop
+            this.stopError = e;
+            // Wait a little bit to try to get the output buffer to flush
+            // the exception notification to the client.
+            try
+            {
+               Thread.sleep(500);
+            }
+            catch (InterruptedException ie)
+            {
+               Thread.currentThread().interrupt();
+            }
+            // Worst case is we just kill the connection before the
+            // notification gets to him.
+            stopAsync();
+         }
+      }
+      else if (!stopping.get() && !inServiceException)
+      {
+         inServiceException = true;
+         try
+         {
+            ConnectionError ce = new ConnectionError();
+            ce.setException(e);
+            if (pendingStop)
+            {
+               dispatchSync(ce);
+            }
+            else
+            {
+               dispatchAsync(ce);
+            }
+         }
+         finally
+         {
+            inServiceException = false;
+         }
+      }
+   }
+
+   public void serviceTransportException(IOException e)
+   {
+      /*
+       * deal with it later BrokerService bService =
+       * connector.getBrokerService(); if (bService.isShutdownOnSlaveFailure())
+       * { if (brokerInfo != null) { if (brokerInfo.isSlaveBroker()) {
+       * LOG.error("Slave has exception: {} shutting down master now.",
+       * e.getMessage(), e); try { doStop(); bService.stop(); } catch (Exception
+       * ex) { LOG.warn("Failed to stop the master", ex); } } } } if
+       * (!stopping.get() && !pendingStop) { transportException.set(e); if
+       * (TRANSPORTLOG.isDebugEnabled()) { TRANSPORTLOG.debug(this + " failed: "
+       * + e, e); } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
+       * TRANSPORTLOG.warn(this + " failed: " + e); } stopAsync(); }
+       */
+   }
+
+   public void setMarkedCandidate(boolean markedCandidate)
+   {
+      this.markedCandidate = markedCandidate;
+      if (!markedCandidate)
+      {
+         timeStamp = 0;
+         blockedCandidate = false;
+      }
+   }
+
+   protected void dispatch(Command command) throws IOException
+   {
+      try
+      {
+         setMarkedCandidate(true);
+         this.physicalSend(command);
+      }
+      finally
+      {
+         setMarkedCandidate(false);
+      }
+   }
+
+   protected void processDispatch(Command command) throws IOException
+   {
+      MessageDispatch messageDispatch = (MessageDispatch) (command
+            .isMessageDispatch() ? command : null);
+      try
+      {
+         if (!stopping.get())
+         {
+            if (messageDispatch != null)
+            {
+               protocolManager.preProcessDispatch(messageDispatch);
+            }
+            dispatch(command);
+         }
+      }
+      catch (IOException e)
+      {
+         if (messageDispatch != null)
+         {
+            TransmitCallback sub = messageDispatch.getTransmitCallback();
+            protocolManager.postProcessDispatch(messageDispatch);
+            if (sub != null)
+            {
+               sub.onFailure();
+            }
+            messageDispatch = null;
+            throw e;
+         }
+      }
+      finally
+      {
+         if (messageDispatch != null)
+         {
+            TransmitCallback sub = messageDispatch.getTransmitCallback();
+            protocolManager.postProcessDispatch(messageDispatch);
+            if (sub != null)
+            {
+               sub.onSuccess();
+            }
+         }
+      }
+   }
+
+   private AMQMessageAuthorizationPolicy getMessageAuthorizationPolicy()
+   {
+      return this.messageAuthorizationPolicy;
+   }
+
+   protected synchronized AMQTransportConnectionState unregisterConnectionState(
+         ConnectionId connectionId)
+   {
+      return connectionStateRegister.unregisterConnectionState(connectionId);
+   }
+
+   protected synchronized AMQTransportConnectionState registerConnectionState(
+         ConnectionId connectionId, AMQTransportConnectionState state)
+   {
+      AMQTransportConnectionState cs = null;
+      if (!connectionStateRegister.isEmpty()
+            && !connectionStateRegister.doesHandleMultipleConnectionStates())
+      {
+         // swap implementations
+         AMQTransportConnectionStateRegister newRegister = new AMQMapTransportConnectionStateRegister();
+         newRegister.intialize(connectionStateRegister);
+         connectionStateRegister = newRegister;
+      }
+      cs = connectionStateRegister.registerConnectionState(connectionId, state);
+      return cs;
+   }
+
+   public void delayedStop(final int waitTime, final String reason,
+         Throwable cause)
+   {
+      if (waitTime > 0)
+      {
+         synchronized (this)
+         {
+            pendingStop = true;
+            stopError = cause;
+         }
+         try
+         {
+            stopTaskRunnerFactory.execute(new Runnable()
+            {
+               @Override
+               public void run()
+               {
+                  try
+                  {
+                     Thread.sleep(waitTime);
+                     stopAsync();
+                  }
+                  catch (InterruptedException e)
+                  {
+                  }
+               }
+            });
+         }
+         catch (Throwable t)
+         {
+            // log error
+         }
+      }
+   }
+
+   public void stopAsync()
+   {
+      // If we're in the middle of starting then go no further... for now.
+      synchronized (this)
+      {
+         pendingStop = true;
+         if (starting)
+         {
+            // log
+            return;
+         }
+      }
+      if (stopping.compareAndSet(false, true))
+      {
+         // Let all the connection contexts know we are shutting down
+         // so that in progress operations can notice and unblock.
+         List<AMQTransportConnectionState> connectionStates = listConnectionStates();
+         for (AMQTransportConnectionState cs : connectionStates)
+         {
+            AMQConnectionContext connectionContext = cs.getContext();
+            if (connectionContext != null)
+            {
+               connectionContext.getStopping().set(true);
+            }
+         }
+         try
+         {
+            stopTaskRunnerFactory.execute(new Runnable()
+            {
+               @Override
+               public void run()
+               {
+                  serviceLock.writeLock().lock();
+                  try
+                  {
+                     doStop();
+                  }
+                  catch (Throwable e)
+                  {
+                     // LOG
+                  }
+                  finally
+                  {
+                     stopped.countDown();
+                     serviceLock.writeLock().unlock();
+                  }
+               }
+            });
+         }
+         catch (Throwable t)
+         {
+            // LOG
+            stopped.countDown();
+         }
+      }
+   }
+
+   protected synchronized List<AMQTransportConnectionState> listConnectionStates()
+   {
+      return connectionStateRegister.listConnectionStates();
+   }
+
+   protected void doStop() throws Exception
+   {
+      this.acceptorUsed.onStopped(this);
+      /*
+       * What's a duplex bridge? try { synchronized (this) { if (duplexBridge !=
+       * null) { duplexBridge.stop(); } } } catch (Exception ignore) {
+       * LOG.trace("Exception caught stopping. This exception is ignored.",
+       * ignore); }
+       */
+      try
+      {
+         getTransportConnection().close();
+      }
+      catch (Exception e)
+      {
+         // log
+      }
+
+      if (taskRunner != null)
+      {
+         taskRunner.shutdown(1);
+         taskRunner = null;
+      }
+
+      active = false;
+      // Run the MessageDispatch callbacks so that message references get
+      // cleaned up.
+      synchronized (dispatchQueue)
+      {
+         for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();)
+         {
+            Command command = iter.next();
+            if (command.isMessageDispatch())
+            {
+               MessageDispatch md = (MessageDispatch) command;
+               TransmitCallback sub = md.getTransmitCallback();
+               protocolManager.postProcessDispatch(md);
+               if (sub != null)
+               {
+                  sub.onFailure();
+               }
+            }
+         }
+         dispatchQueue.clear();
+      }
+      //
+      // Remove all logical connection associated with this connection
+      // from the broker.
+      if (!protocolManager.isStopped())
+      {
+         List<AMQTransportConnectionState> connectionStates = listConnectionStates();
+         connectionStates = listConnectionStates();
+         for (AMQTransportConnectionState cs : connectionStates)
+         {
+            cs.getContext().getStopping().set(true);
+            try
+            {
+               processRemoveConnection(cs.getInfo().getConnectionId(), 0L);
+            }
+            catch (Throwable ignore)
+            {
+               ignore.printStackTrace();
+            }
+         }
+      }
+   }
+
+   @Override
+   public Response processAddConsumer(ConsumerInfo info)
+   {
+      Response resp = null;
+      try
+      {
+         protocolManager.addConsumer(this, info);
+      }
+      catch (Exception e)
+      {
+         if (e instanceof HornetQSecurityException)
+         {
+            resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+         }
+         else
+         {
+            resp = new ExceptionResponse(e);
+         }
+      }
+      return resp;
+   }
+
+   AMQConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id)
+   {
+      AMQConsumerBrokerExchange result = consumerExchanges.get(id);
+      if (result == null)
+      {
+         synchronized (consumerExchanges)
+         {
+            result = new AMQConsumerBrokerExchange();
+            AMQTransportConnectionState state = lookupConnectionState(id);
+            context = state.getContext();
+            result.setConnectionContext(context);
+            SessionState ss = state.getSessionState(id.getParentId());
+            if (ss != null)
+            {
+               ConsumerState cs = ss.getConsumerState(id);
+               if (cs != null)
+               {
+                  ConsumerInfo info = cs.getInfo();
+                  if (info != null)
+                  {
+                     if (info.getDestination() != null
+                           && info.getDestination().isPattern())
+                     {
+                        result.setWildcard(true);
+                     }
+                  }
+               }
+            }
+            consumerExchanges.put(id, result);
+         }
+      }
+      return result;
+   }
+
+   protected synchronized AMQTransportConnectionState lookupConnectionState(
+         ConsumerId id)
+   {
+      return connectionStateRegister.lookupConnectionState(id);
+   }
+
+   protected synchronized AMQTransportConnectionState lookupConnectionState(
+         ProducerId id)
+   {
+      return connectionStateRegister.lookupConnectionState(id);
+   }
+
+   public int getConsumerCount(ConnectionId connectionId)
+   {
+      int result = 0;
+      AMQTransportConnectionState cs = lookupConnectionState(connectionId);
+      if (cs != null)
+      {
+         for (SessionId sessionId : cs.getSessionIds())
+         {
+            SessionState sessionState = cs.getSessionState(sessionId);
+            if (sessionState != null)
+            {
+               result += sessionState.getConsumerIds().size();
+            }
+         }
+      }
+      return result;
+   }
+
+   public int getProducerCount(ConnectionId connectionId)
+   {
+      int result = 0;
+      AMQTransportConnectionState cs = lookupConnectionState(connectionId);
+      if (cs != null)
+      {
+         for (SessionId sessionId : cs.getSessionIds())
+         {
+            SessionState sessionState = cs.getSessionState(sessionId);
+            if (sessionState != null)
+            {
+               result += sessionState.getProducerIds().size();
+            }
+         }
+      }
+      return result;
+   }
+
+   public synchronized AMQTransportConnectionState lookupConnectionState(
+         ConnectionId connectionId)
+   {
+      return connectionStateRegister.lookupConnectionState(connectionId);
+   }
+
+   @Override
+   public Response processAddDestination(DestinationInfo dest) throws Exception
+   {
+      Response resp = null;
+      try
+      {
+         protocolManager.addDestination(this, dest);
+      }
+      catch (Exception e)
+      {
+         if (e instanceof HornetQSecurityException)
+         {
+            resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+         }
+         else
+         {
+            resp = new ExceptionResponse(e);
+         }
+      }
+      return resp;
+   }
+
+   @Override
+   public Response processAddProducer(ProducerInfo info) throws Exception
+   {
+      protocolManager.addProducer(this, info);
+      return null;
+   }
+
+   @Override
+   public Response processAddSession(SessionInfo info) throws Exception
+   {
+      ConnectionId connectionId = info.getSessionId().getParentId();
+      AMQTransportConnectionState cs = lookupConnectionState(connectionId);
+      // Avoid replaying dup commands
+      if (cs != null && !cs.getSessionIds().contains(info.getSessionId()))
+      {
+         protocolManager.addSession(this, info);
+         try
+         {
+            cs.addSession(info);
+         }
+         catch (IllegalStateException e)
+         {
+            e.printStackTrace();
+            protocolManager.removeSession(cs.getContext(), info);
+         }
+      }
+      return null;
+   }
+
+   @Override
+   public Response processBeginTransaction(TransactionInfo info) throws Exception
+   {
+      TransactionId txId = info.getTransactionId();
+
+      if (!txMap.containsKey(txId))
+      {
+         txMap.put(txId, info);
+      }
+      return null;
+   }
+
+   @Override
+   public Response processBrokerInfo(BrokerInfo arg0) throws Exception
+   {
+      throw new IllegalStateException("not implemented! ");
+   }
+
+   @Override
+   public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception
+   {
+      protocolManager.commitTransactionOnePhase(info);
+      TransactionId txId = info.getTransactionId();
+      txMap.remove(txId);
+
+      return null;
+   }
+
+   @Override
+   public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception
+   {
+      protocolManager.commitTransactionTwoPhase(info);
+      TransactionId txId = info.getTransactionId();
+      txMap.remove(txId);
+
+      return null;
+   }
+
+   @Override
+   public Response processConnectionControl(ConnectionControl arg0) throws Exception
+   {
+      throw new IllegalStateException("not implemented! ");
+   }
+
+   @Override
+   public Response processConnectionError(ConnectionError arg0) throws Exception
+   {
+      throw new IllegalStateException("not implemented! ");
+   }
+
+   @Override
+   public Response processConsumerControl(ConsumerControl arg0) throws Exception
+   {
+      throw new IllegalStateException("not implemented! ");
+   }
+
+   @Override
+   public Response processControlCommand(ControlCommand arg0) throws Exception
+   {
+      throw new IllegalStateException("not implemented! ");
+   }
+
+   @Override
+   public Response processEndTransaction(TransactionInfo info) throws Exception
+   {
+      TransactionId txId = info.getTransactionId();
+
+      if (!txMap.containsKey(txId))
+      {
+         txMap.put(txId, info);
+      }
+      return null;
+   }
+
+   @Override
+   public Response processFlush(FlushCommand arg0) throws Exception
+   {
+      throw new IllegalStateException("not implemented! ");
+   }
+
+   @Override
+   public Response processForgetTransaction(TransactionInfo info) throws Exception
+   {
+      TransactionId txId = info.getTransactionId();
+      txMap.remove(txId);
+
+      protocolManager.forgetTransaction(info.getTransactionId());
+      return null;
+   }
+
+   @Override
+   public Response processKeepAlive(KeepAliveInfo arg0) throws Exception
+   {
+      throw new IllegalStateException("not implemented! ");
+   }
+
+   @Override
+   public Response processMessage(Message messageSend)
+   {
+      Response resp = null;
+      try
+      {
+         ProducerId producerId = messageSend.getProducerId();
+         AMQProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
+         final AMQConnectionContext pcontext = producerExchange.getConnectionContext();
+         final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
+         boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0
+               && !pcontext.isInRecoveryMode();
+
+         AMQSession session = protocolManager.getSession(producerId.getParentId());
+
+         if (producerExchange.canDispatch(messageSend))
+         {
+            SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
+            if (result.isBlockNextSend())
+            {
+               if (!context.isNetworkConnection() && result.isSendFailIfNoSpace())
+               {
+                  throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer ("
+                     + producerId + ") to prevent flooding "
+                     + result.getBlockingAddress() + "."
+                     + " See http://activemq.apache.org/producer-flow-control.html for more info");
+               }
+
+               if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired())
+               {
+                  //in that case don't send the response
+                  //this will force the client to wait until
+                  //the response is got.
+                  if (context == null)
+                  {
+                     this.context = new AMQConnectionContext();
+                  }
+                  context.setDontSendReponse(true);
+               }
+               else
+               {
+                  //hang the connection until the space is available
+                  session.blockingWaitForSpace(producerExchange, result);
+               }
+            }
+            else if (sendProducerAck)
+            {
+               ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+               this.dispatchAsync(ack);
+            }
+         }
+      }
+      catch (Exception e)
+      {
+         if (e instanceof HornetQSecurityException)
+         {
+            resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+         }
+         else
+         {
+            resp = new ExceptionResponse(e);
+         }
+      }
+      return resp;
+   }
+
+   private AMQProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException
+   {
+      AMQProducerBrokerExchange result = producerExchanges.get(id);
+      if (result == null)
+      {
+         synchronized (producerExchanges)
+         {
+            result = new AMQProducerBrokerExchange();
+            AMQTransportConnectionState state = lookupConnectionState(id);
+            context = state.getContext();
+            result.setConnectionContext(context);
+            if (context.isReconnect()
+                  || (context.isNetworkConnection() && this.acceptorUsed
+                        .isAuditNetworkProducers()))
+            {
+               result.setLastStoredSequenceId(protocolManager
+                     .getPersistenceAdapter().getLastProducerSequenceId(id));
+            }
+            SessionState ss = state.getSessionState(id.getParentId());
+            if (ss != null)
+            {
+               result.setProducerState(ss.getProducerState(id));
+               ProducerState producerState = ss.getProducerState(id);
+               if (producerState != null && producerState.getInfo() != null)
+               {
+                  ProducerInfo info = producerState.getInfo();
+                  result.setMutable(info.getDestination() == null
+                        || info.getDestination().isComposite());
+               }
+            }
+            producerExchanges.put(id, result);
+         }
+      }
+      else
+      {
+         context = result.getConnectionContext();
+      }
+      return result;
+   }
+
+   @Override
+   public Response processMessageAck(MessageAck ack) throws Exception
+   {
+      ConsumerId consumerId = ack.getConsumerId();
+      SessionId sessionId = consumerId.getParentId();
+      AMQSession session = protocolManager.getSession(sessionId);
+      session.acknowledge(ack);
+      return null;
+   }
+
+   @Override
+   public Response processMessageDispatch(MessageDispatch arg0) throws Exception
+   {
+      throw new IllegalStateException("not implemented! ");
+   }
+
+   @Override
+   public Response processMessageDispatchNotification(
+         MessageDispatchNotification arg0) throws Exception
+   {
+      throw new IllegalStateException("not implemented! ");
+   }
+
+   @Override
+   public Response processMessagePull(MessagePull arg0) throws Exception
+   {
+      throw new IllegalStateException("not implemented! ");
+   }
+
+   @Override
+   public Response processPrepareTransaction(TransactionInfo info) throws Exception
+   {
+      protocolManager.prepareTransaction(info);
+      return null;
+   }
+
+   @Override
+   public Response processProducerAck(ProducerAck arg0) throws Exception
+   {
+      throw new IllegalStateException("not implemented! ");
+   }
+
+   @Override
+   public Response processRecoverTransactions(TransactionInfo info) throws Exception
+   {
+      AMQTransportConnectionState cs = lookupConnectionState(info.getConnectionId());
+      Set<SessionId> sIds = cs.getSessionIds();
+      TransactionId[] recovered = protocolManager.recoverTransactions(sIds);
+      return new DataArrayResponse(recovered);
+   }
+
+   @Override
+   public Response processRemoveConnection(ConnectionId id,
+         long lastDeliveredSequenceId) throws Exception
+   {
+      AMQTransportConnectionState cs = lookupConnectionState(id);
+      if (cs != null)
+      {
+         // Don't allow things to be added to the connection state while we
+         // are shutting down.
+         cs.shutdown();
+         // Cascade the connection stop to the sessions.
+         for (SessionId sessionId : cs.getSessionIds())
+         {
+            try
+            {
+               processRemoveSession(sessionId, lastDeliveredSequenceId);
+            }
+            catch (Throwable e)
+            {
+               // LOG
+            }
+         }
+
+         try
+         {
+            protocolManager.removeConnection(cs.getContext(), cs.getInfo(),
+                  null);
+         }
+         catch (Throwable e)
+         {
+            // log
+         }
+         AMQTransportConnectionState state = unregisterConnectionState(id);
+         if (state != null)
+         {
+            synchronized (brokerConnectionStates)
+            {
+               // If we are the last reference, we should remove the state
+               // from the broker.
+               if (state.decrementReference() == 0)
+               {
+                  brokerConnectionStates.remove(id);
+               }
+            }
+         }
+      }
+      return null;
+   }
+
+   @Override
+   public Response processRemoveConsumer(ConsumerId id,
+         long lastDeliveredSequenceId) throws Exception
+   {
+      SessionId sessionId = id.getParentId();
+      ConnectionId connectionId = sessionId.getParentId();
+      AMQTransportConnectionState cs = lookupConnectionState(connectionId);
+      if (cs == null)
+      {
+         throw new IllegalStateException(
+               "Cannot remove a consumer from a connection that had not been registered: "
+                     + connectionId);
+      }
+      SessionState ss = cs.getSessionState(sessionId);
+      if (ss == null)
+      {
+         throw new IllegalStateException(
+               "Cannot remove a consumer from a session that had not been registered: "
+                     + sessionId);
+      }
+      ConsumerState consumerState = ss.removeConsumer(id);
+      if (consumerState == null)
+      {
+         throw new IllegalStateException(
+               "Cannot remove a consumer that had not been registered: " + id);
+      }
+      ConsumerInfo info = consumerState.getInfo();
+      info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
+      protocolManager.removeConsumer(cs.getContext(), consumerState.getInfo());
+      removeConsumerBrokerExchange(id);
+      return null;
+   }
+
+   private void removeConsumerBrokerExchange(ConsumerId id)
+   {
+      synchronized (consumerExchanges)
+      {
+         consumerExchanges.remove(id);
+      }
+   }
+
+   @Override
+   public Response processRemoveDestination(DestinationInfo info) throws Exception
+   {
+      ActiveMQDestination dest = info.getDestination();
+      if (dest.isQueue())
+      {
+         String qName = "jms.queue." + dest.getPhysicalName();
+         protocolManager.deleteQueue(qName);
+      }
+      return null;
+   }
+
+   @Override
+   public Response processRemoveProducer(ProducerId id) throws Exception
+   {
+      protocolManager.removeProducer(id);
+      return null;
+   }
+
+   @Override
+   public Response processRemoveSession(SessionId id,
+         long lastDeliveredSequenceId) throws Exception
+   {
+      ConnectionId connectionId = id.getParentId();
+      AMQTransportConnectionState cs = lookupConnectionState(connectionId);
+      if (cs == null)
+      {
+         throw new IllegalStateException(
+               "Cannot remove session from connection that had not been registered: "
+                     + connectionId);
+      }
+      SessionState session = cs.getSessionState(id);
+      if (session == null)
+      {
+         throw new IllegalStateException(
+               "Cannot remove session that had not been registered: " + id);
+      }
+      // Don't let new consumers or producers get added while we are closing
+      // this down.
+      session.shutdown();
+      // Cascade the connection stop to the consumers and producers.
+      for (ConsumerId consumerId : session.getConsumerIds())
+      {
+         try
+         {
+            processRemoveConsumer(consumerId, lastDeliveredSequenceId);
+         }
+         catch (Throwable e)
+         {
+            // LOG.warn("Failed to remove consumer: {}", consumerId, e);
+         }
+      }
+      for (ProducerId producerId : session.getProducerIds())
+      {
+         try
+         {
+            processRemoveProducer(producerId);
+         }
+         catch (Throwable e)
+         {
+            // LOG.warn("Failed to remove producer: {}", producerId, e);
+         }
+      }
+      cs.removeSession(id);
+      protocolManager.removeSession(cs.getContext(), session.getInfo());
+      return null;
+   }
+
+   @Override
+   public Response processRemoveSubscription(RemoveSubscriptionInfo arg0) throws Exception
+   {
+      throw new IllegalStateException("not implemented! ");
+   }
+
+   @Override
+   public Response processRollbackTransaction(TransactionInfo info) throws Exception
+   {
+      protocolManager.rollbackTransaction(info);
+      TransactionId txId = info.getTransactionId();
+      txMap.remove(txId);
+      return null;
+   }
+
+   @Override
+   public Response processShutdown(ShutdownInfo info) throws Exception
+   {
+      return null;
+   }
+
+   @Override
+   public Response processWireFormat(WireFormatInfo arg0) throws Exception
+   {
+      throw new IllegalStateException("not implemented! ");
+   }
+
+   public int getMaximumConsumersAllowedPerConnection()
+   {
+      return this.acceptorUsed.getMaximumConsumersAllowedPerConnection();
+   }
+
+   public int getMaximumProducersAllowedPerConnection()
+   {
+      return this.acceptorUsed.getMaximumProducersAllowedPerConnection();
+   }
+
+   public void deliverMessage(MessageDispatch dispatch)
+   {
+      Message m = dispatch.getMessage();
+      if (m != null)
+      {
+         long endTime = System.currentTimeMillis();
+         m.setBrokerOutTime(endTime);
+      }
+
+      protocolManager.send(this, dispatch);
+   }
+
+   public WireFormat getMarshaller()
+   {
+      return this.wireFormat;
+   }
+
+   public void registerTempQueue(SimpleString qName)
+   {
+      tempQueues.add(qName.toString());
+   }
+
+   @Override
+   public void disconnect(String reason, boolean fail)
+   {
+      destroy();
+   }
+
+   @Override
+   public void fail(HornetQException e, String message)
+   {
+      destroy();
+   }
+
+   public void setAdvisorySession(AMQSession amqSession)
+   {
+      this.advisorySession = amqSession;
+   }
+
+   public AMQSession getAdvisorySession()
+   {
+      return this.advisorySession;
+   }
+
+   public AMQConnectionContext getConext()
+   {
+      return this.state.getContext();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireMessageConverter.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireMessageConverter.java
new file mode 100644
index 0000000..1024622
--- /dev/null
+++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireMessageConverter.java
@@ -0,0 +1,787 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQPropertyConversionException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.protocol.openwire.amq.AMQConsumer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.spi.core.protocol.MessageConverter;
+import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.TypedProperties;
+import org.hornetq.utils.UUIDGenerator;
+
+public class OpenWireMessageConverter implements MessageConverter
+{
+   public static final String AMQ_PREFIX = "__HDR_";
+   public static final String AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = AMQ_PREFIX + "dlqDeliveryFailureCause";
+
+   private static final String AMQ_MSG_ARRIVAL = AMQ_PREFIX + "ARRIVAL";
+   private static final String AMQ_MSG_BROKER_IN_TIME = AMQ_PREFIX + "BROKER_IN_TIME";
+
+   private static final String AMQ_MSG_BROKER_PATH = AMQ_PREFIX + "BROKER_PATH";
+   private static final String AMQ_MSG_CLUSTER = AMQ_PREFIX + "CLUSTER";
+   private static final String AMQ_MSG_COMMAND_ID = AMQ_PREFIX + "COMMAND_ID";
+   private static final String AMQ_MSG_DATASTRUCTURE = AMQ_PREFIX + "DATASTRUCTURE";
+   private static final String AMQ_MSG_DESTINATION = AMQ_PREFIX + "DESTINATION";
+   private static final String AMQ_MSG_GROUP_ID = AMQ_PREFIX + "GROUP_ID";
+   private static final String AMQ_MSG_GROUP_SEQUENCE = AMQ_PREFIX + "GROUP_SEQUENCE";
+   private static final String AMQ_MSG_MESSAGE_ID = AMQ_PREFIX + "MESSAGE_ID";
+   private static final String AMQ_MSG_ORIG_DESTINATION = AMQ_PREFIX + "ORIG_DESTINATION";
+   private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID";
+   private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID";
+   private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP";
+   private static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER";
+   private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO";
+
+   private static final String AMQ_MSG_CONSUMER_ID = AMQ_PREFIX + "CONSUMER_ID";
+   private static final String AMQ_MSG_TX_ID = AMQ_PREFIX + "TX_ID";
+   private static final String AMQ_MSG_USER_ID = AMQ_PREFIX + "USER_ID";
+
+   private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
+   private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE";
+
+   @Override
+   public ServerMessage inbound(Object message)
+   {
+      // TODO: implement this
+      return null;
+   }
+
+   public Object outbound(ServerMessage message, int deliveryCount)
+   {
+      // TODO: implement this
+      return null;
+   }
+
+   //convert an ActiveMQ message to coreMessage
+   public static void toCoreMessage(ServerMessageImpl coreMessage, Message messageSend, WireFormat marshaller) throws IOException
+   {
+      String type = messageSend.getType();
+      if (type != null)
+      {
+         coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type));
+      }
+      coreMessage.setDurable(messageSend.isPersistent());
+      coreMessage.setExpiration(messageSend.getExpiration());
+      coreMessage.setPriority(messageSend.getPriority());
+      coreMessage.setTimestamp(messageSend.getTimestamp());
+
+      byte coreType = toCoreType(messageSend.getDataStructureType());
+      coreMessage.setType(coreType);
+
+      ByteSequence contents = messageSend.getContent();
+      if (contents != null)
+      {
+         HornetQBuffer body = coreMessage.getBodyBuffer();
+         switch (coreType)
+         {
+            case org.hornetq.api.core.Message.TEXT_TYPE:
+               ByteArrayInputStream tis = new ByteArrayInputStream(contents);
+               DataInputStream tdataIn = new DataInputStream(tis);
+               String text = MarshallingSupport.readUTF8(tdataIn);
+               tdataIn.close();
+               body.writeNullableSimpleString(new SimpleString(text));
+               break;
+            case org.hornetq.api.core.Message.MAP_TYPE:
+               InputStream mis = new ByteArrayInputStream(contents);
+               DataInputStream mdataIn = new DataInputStream(mis);
+               Map<String, Object> map = MarshallingSupport.unmarshalPrimitiveMap(mdataIn);
+               mdataIn.close();
+               TypedProperties props = new TypedProperties();
+               loadMapIntoProperties(props, map);
+               props.encode(body);
+               break;
+            case org.hornetq.api.core.Message.OBJECT_TYPE:
+               body.writeInt(contents.length);
+               body.writeBytes(contents.data, contents.offset, contents.length);
+               break;
+            case org.hornetq.api.core.Message.STREAM_TYPE:
+               InputStream sis = new ByteArrayInputStream(contents);
+               DataInputStream sdis = new DataInputStream(sis);
+               int stype = sdis.read();
+               while (stype != -1)
+               {
+                  switch (stype)
+                  {
+                     case MarshallingSupport.BOOLEAN_TYPE:
+                        body.writeByte(DataConstants.BOOLEAN);
+                        body.writeBoolean(sdis.readBoolean());
+                        break;
+                     case MarshallingSupport.BYTE_TYPE:
+                        body.writeByte(DataConstants.BYTE);
+                        body.writeByte(sdis.readByte());
+                        break;
+                     case MarshallingSupport.BYTE_ARRAY_TYPE:
+                        body.writeByte(DataConstants.BYTES);
+                        int slen = sdis.readInt();
+                        byte[] sbytes = new byte[slen];
+                        sdis.read(sbytes);
+                        body.writeInt(slen);
+                        body.writeBytes(sbytes);
+                        break;
+                     case MarshallingSupport.CHAR_TYPE:
+                        body.writeByte(DataConstants.CHAR);
+                        char schar = sdis.readChar();
+                        body.writeShort((short)schar);
+                        break;
+                     case MarshallingSupport.DOUBLE_TYPE:
+                        body.writeByte(DataConstants.DOUBLE);
+                        double sdouble = sdis.readDouble();
+                        body.writeLong(Double.doubleToLongBits(sdouble));
+                        break;
+                     case MarshallingSupport.FLOAT_TYPE:
+                        body.writeByte(DataConstants.FLOAT);
+                        float sfloat = sdis.readFloat();
+                        body.writeInt(Float.floatToIntBits(sfloat));
+                        break;
+                     case MarshallingSupport.INTEGER_TYPE:
+                        body.writeByte(DataConstants.INT);
+                        body.writeInt(sdis.readInt());
+                        break;
+                     case MarshallingSupport.LONG_TYPE:
+                        body.writeByte(DataConstants.LONG);
+                        body.writeLong(sdis.readLong());
+                        break;
+                     case MarshallingSupport.SHORT_TYPE:
+                        body.writeByte(DataConstants.SHORT);
+                        body.writeShort(sdis.readShort());
+                        break;
+                     case MarshallingSupport.STRING_TYPE:
+                        body.writeByte(DataConstants.STRING);
+                        String sstring = sdis.readUTF();
+                        body.writeNullableString(sstring);
+                        break;
+                     case MarshallingSupport.BIG_STRING_TYPE:
+                        body.writeByte(DataConstants.STRING);
+                        String sbigString = MarshallingSupport.readUTF8(sdis);
+                        body.writeNullableString(sbigString);
+                        break;
+                     case MarshallingSupport.NULL:
+                        body.writeByte(DataConstants.STRING);
+                        body.writeNullableString(null);
+                        break;
+                     default:
+                        //something we don't know, ignore
+                        break;
+                  }
+                  stype = sdis.read();
+               }
+               sdis.close();
+               break;
+            default:
+               body.writeBytes(contents.data, contents.offset, contents.length);
+               break;
+         }
+      }
+      //amq specific
+      coreMessage.putLongProperty(AMQ_MSG_ARRIVAL, messageSend.getArrival());
+      coreMessage.putLongProperty(AMQ_MSG_BROKER_IN_TIME, messageSend.getBrokerInTime());
+      BrokerId[] brokers = messageSend.getBrokerPath();
+      if (brokers != null)
+      {
+         StringBuilder builder = new StringBuilder();
+         for (int i = 0; i < brokers.length; i++)
+         {
+            builder.append(brokers[i].getValue());
+            if (i != (brokers.length - 1))
+            {
+               builder.append(","); //is this separator safe?
+            }
+         }
+         coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString());
+      }
+      BrokerId[] cluster = messageSend.getCluster();
+      if (cluster != null)
+      {
+         StringBuilder builder = new StringBuilder();
+         for (int i = 0; i < cluster.length; i++)
+         {
+            builder.append(cluster[i].getValue());
+            if (i != (cluster.length - 1))
+            {
+               builder.append(","); //is this separator safe?
+            }
+         }
+         coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString());
+      }
+
+      coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId());
+      String corrId = messageSend.getCorrelationId();
+      if (corrId != null)
+      {
+         coreMessage.putStringProperty("JMSCorrelationID", corrId);
+      }
+      DataStructure ds = messageSend.getDataStructure();
+      if (ds != null)
+      {
+         ByteSequence dsBytes = marshaller.marshal(ds);
+         dsBytes.compact();
+         coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
+      }
+      ActiveMQDestination dest = messageSend.getDestination();
+      ByteSequence destBytes = marshaller.marshal(dest);
+      destBytes.compact();
+      coreMessage.putBytesProperty(AMQ_MSG_DESTINATION, destBytes.data);
+      String groupId = messageSend.getGroupID();
+      if (groupId != null)
+      {
+         coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, groupId);
+      }
+      coreMessage.putIntProperty(AMQ_MSG_GROUP_SEQUENCE, messageSend.getGroupSequence());
+
+      MessageId messageId = messageSend.getMessageId();
+
+      ByteSequence midBytes = marshaller.marshal(messageId);
+      midBytes.compact();
+      coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data);
+
+      ActiveMQDestination origDest = messageSend.getOriginalDestination();
+      if (origDest != null)
+      {
+         ByteSequence origDestBytes = marshaller.marshal(origDest);
+         origDestBytes.compact();
+         coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data);
+      }
+      TransactionId origTxId = messageSend.getOriginalTransactionId();
+      if (origTxId != null)
+      {
+         ByteSequence origTxBytes = marshaller.marshal(origTxId);
+         origTxBytes.compact();
+         coreMessage.putBytesProperty(AMQ_MSG_ORIG_TXID, origTxBytes.data);
+      }
+      ProducerId producerId = messageSend.getProducerId();
+      if (producerId != null)
+      {
+         ByteSequence producerIdBytes = marshaller.marshal(producerId);
+         producerIdBytes.compact();
+         coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
+      }
+      ByteSequence propBytes = messageSend.getMarshalledProperties();
+      if (propBytes != null)
+      {
+         propBytes.compact();
+         coreMessage.putBytesProperty(AMQ_MSG_MARSHALL_PROP, propBytes.data);
+         //unmarshall properties to core so selector will work
+         Map<String, Object> props = messageSend.getProperties();
+         //Map<String, Object> props = MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(propBytes)));
+         Iterator<Entry<String, Object>> iterEntries = props.entrySet().iterator();
+         while (iterEntries.hasNext())
+         {
+            Entry<String, Object> ent = iterEntries.next();
+
+            Object value = ent.getValue();
+            try
+            {
+               coreMessage.putObjectProperty(ent.getKey(), value);
+            }
+            catch (HornetQPropertyConversionException e)
+            {
+               coreMessage.putStringProperty(ent.getKey(), value.toString());
+            }
+         }
+      }
+
+      coreMessage.putIntProperty(AMQ_MSG_REDELIVER_COUNTER, messageSend.getRedeliveryCounter());
+      ActiveMQDestination replyTo = messageSend.getReplyTo();
+      if (replyTo != null)
+      {
+         ByteSequence replyToBytes = marshaller.marshal(replyTo);
+         replyToBytes.compact();
+         coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data);
+      }
+
+      ConsumerId consumerId = messageSend.getTargetConsumerId();
+
+      if (consumerId != null)
+      {
+         ByteSequence consumerIdBytes = marshaller.marshal(consumerId);
+         consumerIdBytes.compact();
+         coreMessage.putBytesProperty(AMQ_MSG_CONSUMER_ID, consumerIdBytes.data);
+      }
+      TransactionId txId = messageSend.getTransactionId();
+      if (txId != null)
+      {
+         ByteSequence txIdBytes = marshaller.marshal(txId);
+         txIdBytes.compact();
+         coreMessage.putBytesProperty(AMQ_MSG_TX_ID, txIdBytes.data);
+      }
+
+      String userId = messageSend.getUserID();
+      if (userId != null)
+      {
+         coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);
+      }
+      coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageSend.isCompressed());
+      coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
+   }
+
+   private static void loadMapIntoProperties(TypedProperties props, Map<String, Object> map)
+   {
+      Iterator<Entry<String, Object>> iter = map.entrySet().iterator();
+      while (iter.hasNext())
+      {
+         Entry<String, Object> entry = iter.next();
+         SimpleString key = new SimpleString(entry.getKey());
+         Object value = entry.getValue();
+         if (value instanceof UTF8Buffer)
+         {
+            value = ((UTF8Buffer)value).toString();
+         }
+         TypedProperties.setObjectProperty(key, value, props);
+      }
+   }
+
+   public static byte toCoreType(byte amqType)
+   {
+      switch (amqType)
+      {
+         case CommandTypes.ACTIVEMQ_BLOB_MESSAGE:
+            throw new IllegalStateException("We don't support BLOB type yet!");
+         case CommandTypes.ACTIVEMQ_BYTES_MESSAGE:
+            return org.hornetq.api.core.Message.BYTES_TYPE;
+         case CommandTypes.ACTIVEMQ_MAP_MESSAGE:
+            return org.hornetq.api.core.Message.MAP_TYPE;
+         case CommandTypes.ACTIVEMQ_OBJECT_MESSAGE:
+            return org.hornetq.api.core.Message.OBJECT_TYPE;
+         case CommandTypes.ACTIVEMQ_STREAM_MESSAGE:
+            return org.hornetq.api.core.Message.STREAM_TYPE;
+         case CommandTypes.ACTIVEMQ_TEXT_MESSAGE:
+            return org.hornetq.api.core.Message.TEXT_TYPE;
+         case CommandTypes.ACTIVEMQ_MESSAGE:
+            return org.hornetq.api.core.Message.DEFAULT_TYPE;
+         default:
+            throw new IllegalStateException("Unknown ActiveMQ message type: " + amqType);
+      }
+   }
+
+   public static MessageDispatch createMessageDispatch(ServerMessage message,
+         int deliveryCount, AMQConsumer consumer) throws IOException
+   {
+      ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller());
+
+      MessageDispatch md = new MessageDispatch();
+      md.setConsumerId(consumer.getId());
+      md.setMessage(amqMessage);
+      md.setRedeliveryCounter(deliveryCount);
+      ActiveMQDestination destination = amqMessage.getDestination();
+      md.setDestination(destination);
+
+      return md;
+   }
+
+   private static ActiveMQMessage toAMQMessage(ServerMessage coreMessage, WireFormat marshaller) throws IOException
+   {
+      ActiveMQMessage amqMsg = null;
+      byte coreType = coreMessage.getType();
+      switch (coreType)
+      {
+         case org.hornetq.api.core.Message.BYTES_TYPE:
+            amqMsg = new ActiveMQBytesMessage();
+            break;
+         case org.hornetq.api.core.Message.MAP_TYPE:
+            amqMsg = new ActiveMQMapMessage();
+            break;
+         case org.hornetq.api.core.Message.OBJECT_TYPE:
+            amqMsg = new ActiveMQObjectMessage();
+            break;
+         case org.hornetq.api.core.Message.STREAM_TYPE:
+            amqMsg = new ActiveMQStreamMessage();
+            break;
+         case org.hornetq.api.core.Message.TEXT_TYPE:
+            amqMsg = new ActiveMQTextMessage();
+            break;
+         case org.hornetq.api.core.Message.DEFAULT_TYPE:
+            amqMsg = new ActiveMQMessage();
+            break;
+         default:
+            throw new IllegalStateException("Unknown message type: " + coreMessage.getType());
+      }
+
+      String type = coreMessage.getStringProperty(new SimpleString("JMSType"));
+      if (type != null)
+      {
+         amqMsg.setJMSType(type);
+      }
+      amqMsg.setPersistent(coreMessage.isDurable());
+      amqMsg.setExpiration(coreMessage.getExpiration());
+      amqMsg.setPriority(coreMessage.getPriority());
+      amqMsg.setTimestamp(coreMessage.getTimestamp());
+
+      Long brokerInTime = (Long) coreMessage.getObjectProperty(AMQ_MSG_BROKER_IN_TIME);
+      if (brokerInTime == null)
+      {
+         brokerInTime = 0L;
+      }
+      amqMsg.setBrokerInTime(brokerInTime);
+
+      HornetQBuffer buffer = coreMessage.getBodyBuffer();
+      if (buffer != null)
+      {
+         buffer.resetReaderIndex();
+         byte[] bytes = null;
+         synchronized (buffer)
+         {
+            if (coreType == org.hornetq.api.core.Message.TEXT_TYPE)
+            {
+               SimpleString text = buffer.readNullableSimpleString();
+
+               if (text != null)
+               {
+                  ByteArrayOutputStream out = new ByteArrayOutputStream(text.length() + 4);
+                  DataOutputStream dataOut = new DataOutputStream(out);
+                  MarshallingSupport.writeUTF8(dataOut, text.toString());
+                  bytes = out.toByteArray();
+                  out.close();
+               }
+            }
+            else if (coreType == org.hornetq.api.core.Message.MAP_TYPE)
+            {
+               TypedProperties mapData = new TypedProperties();
+               mapData.decode(buffer);
+
+               Map<String, Object> map = mapData.getMap();
+               ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
+               DataOutputStream dataOut = new DataOutputStream(out);
+               MarshallingSupport.marshalPrimitiveMap(map, dataOut);
+               bytes = out.toByteArray();
+               dataOut.close();
+            }
+            else if (coreType == org.hornetq.api.core.Message.OBJECT_TYPE)
+            {
+               int len = buffer.readInt();
+               bytes = new byte[len];
+               buffer.readBytes(bytes);
+            }
+            else if (coreType == org.hornetq.api.core.Message.STREAM_TYPE)
+            {
+               ByteArrayOutputStream out = new ByteArrayOutputStream(buffer.readableBytes());
+               DataOutputStream dataOut = new DataOutputStream(out);
+
+               boolean stop = false;
+               while (!stop && buffer.readable())
+               {
+                  byte primitiveType = buffer.readByte();
+                  switch (primitiveType)
+                  {
+                     case DataConstants.BOOLEAN:
+                        MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
+                        break;
+                     case DataConstants.BYTE:
+                        MarshallingSupport.marshalByte(dataOut, buffer.readByte());
+                        break;
+                     case DataConstants.BYTES:
+                        int len = buffer.readInt();
+                        byte[] bytesData = new byte[len];
+                        buffer.readBytes(bytesData);
+                        MarshallingSupport.marshalByteArray(dataOut, bytesData);
+                        break;
+                     case DataConstants.CHAR:
+                        char ch = (char)buffer.readShort();
+                        MarshallingSupport.marshalChar(dataOut, ch);
+                        break;
+                     case DataConstants.DOUBLE:
+                        double doubleVal = Double.longBitsToDouble(buffer.readLong());
+                        MarshallingSupport.marshalDouble(dataOut, doubleVal);
+                        break;
+                     case DataConstants.FLOAT:
+                        Float floatVal = Float.intBitsToFloat(buffer.readInt());
+                        MarshallingSupport.marshalFloat(dataOut, floatVal);
+                        break;
+                     case DataConstants.INT:
+                        MarshallingSupport.marshalInt(dataOut, buffer.readInt());
+                        break;
+                     case DataConstants.LONG:
+                        MarshallingSupport.marshalLong(dataOut, buffer.readLong());
+                        break;
+                     case DataConstants.SHORT:
+                        MarshallingSupport.marshalShort(dataOut, buffer.readShort());
+                        break;
+                     case DataConstants.STRING:
+                        String string = buffer.readNullableString();
+                        if (string == null)
+                        {
+                           MarshallingSupport.marshalNull(dataOut);
+                        }
+                        else
+                        {
+                           MarshallingSupport.marshalString(dataOut, string);
+                        }
+                        break;
+                     default:
+                        //now we stop
+                        stop = true;
+                        break;
+                  }
+               }
+               bytes = out.toByteArray();
+               dataOut.close();
+            }
+            else
+            {
+               int n = buffer.readableBytes();
+               bytes = new byte[n];
+               buffer.readBytes(bytes);
+            }
+
+            buffer.resetReaderIndex();// this is important for topics as the buffer
+                                      // may be read multiple times
+         }
+
+         if (bytes != null)
+         {
+            ByteSequence content = new ByteSequence(bytes);
+            amqMsg.setContent(content);
+         }
+      }
+
+      //we need check null because messages may come from other clients
+      //and those amq specific attribute may not be set.
+      Long arrival = (Long) coreMessage.getObjectProperty(AMQ_MSG_ARRIVAL);
+      if (arrival == null)
+      {
+         //messages from other sources (like core client) may not set this prop
+         arrival = 0L;
+      }
+      amqMsg.setArrival(arrival);
+
+      String brokerPath = (String) coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH);
+      if (brokerPath != null && brokerPath.isEmpty())
+      {
+         String[] brokers = brokerPath.split(",");
+         BrokerId[] bids = new BrokerId[brokers.length];
+         for (int i = 0; i < bids.length; i++)
+         {
+            bids[i] = new BrokerId(brokers[i]);
+         }
+         amqMsg.setBrokerPath(bids);
+      }
+
+      String clusterPath = (String) coreMessage.getObjectProperty(AMQ_MSG_CLUSTER);
+      if (clusterPath != null && clusterPath.isEmpty())
+      {
+         String[] cluster = clusterPath.split(",");
+         BrokerId[] bids = new BrokerId[cluster.length];
+         for (int i = 0; i < bids.length; i++)
+         {
+            bids[i] = new BrokerId(cluster[i]);
+         }
+         amqMsg.setCluster(bids);
+      }
+
+      Integer commandId = (Integer) coreMessage.getObjectProperty(AMQ_MSG_COMMAND_ID);
+      if (commandId == null)
+      {
+         commandId = -1;
+      }
+      amqMsg.setCommandId(commandId);
+
+      SimpleString corrId = (SimpleString) coreMessage.getObjectProperty("JMSCorrelationID");
+      if (corrId != null)
+      {
+         amqMsg.setCorrelationId(corrId.toString());
+      }
+
+      byte[] dsBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DATASTRUCTURE);
+      if (dsBytes != null)
+      {
+         ByteSequence seq = new ByteSequence(dsBytes);
+         DataStructure ds = (DataStructure)marshaller.unmarshal(seq);
+         amqMsg.setDataStructure(ds);
+      }
+
+      byte[] destBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DESTINATION);
+      if (destBytes != null)
+      {
+         ByteSequence seq = new ByteSequence(destBytes);
+         ActiveMQDestination dest = (ActiveMQDestination) marshaller.unmarshal(seq);
+         amqMsg.setDestination(dest);
+      }
+
+      String groupId = (String) coreMessage.getObjectProperty(AMQ_MSG_GROUP_ID);
+      if (groupId != null)
+      {
+         amqMsg.setGroupID(groupId);
+      }
+
+      Integer groupSequence = (Integer) coreMessage.getObjectProperty(AMQ_MSG_GROUP_SEQUENCE);
+      if (groupSequence == null)
+      {
+         groupSequence = -1;
+      }
+      amqMsg.setGroupSequence(groupSequence);
+
+      MessageId mid = null;
+      byte[] midBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MESSAGE_ID);
+      if (midBytes != null)
+      {
+         ByteSequence midSeq = new ByteSequence(midBytes);
+         mid = (MessageId)marshaller.unmarshal(midSeq);
+      }
+      else
+      {
+         mid = new MessageId(UUIDGenerator.getInstance().generateStringUUID() + ":-1");
+      }
+
+      amqMsg.setMessageId(mid);
+
+      byte[] origDestBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_DESTINATION);
+      if (origDestBytes != null)
+      {
+         ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(origDestBytes));
+         amqMsg.setOriginalDestination(origDest);
+      }
+
+      byte[] origTxIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_TXID);
+      if (origTxIdBytes != null)
+      {
+         TransactionId origTxId = (TransactionId) marshaller.unmarshal(new ByteSequence(origTxIdBytes));
+         amqMsg.setOriginalTransactionId(origTxId);
+      }
+
+      byte[] producerIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_PRODUCER_ID);
+      if (producerIdBytes != null)
+      {
+         ProducerId producerId = (ProducerId) marshaller.unmarshal(new ByteSequence(producerIdBytes));
+         amqMsg.setProducerId(producerId);
+      }
+
+      byte[] marshalledBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MARSHALL_PROP);
+      if (marshalledBytes != null)
+      {
+         amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes));
+      }
+
+      Integer redeliveryCounter = (Integer) coreMessage.getObjectProperty(AMQ_MSG_REDELIVER_COUNTER);
+      if (redeliveryCounter != null)
+      {
+         amqMsg.setRedeliveryCounter(redeliveryCounter);
+      }
+
+      byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
+      if (replyToBytes != null)
+      {
+         ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(replyToBytes));
+         amqMsg.setReplyTo(replyTo);
+      }
+
+      byte[] consumerIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_CONSUMER_ID);
+      if (consumerIdBytes != null)
+      {
+         ConsumerId consumerId = (ConsumerId) marshaller.unmarshal(new ByteSequence(consumerIdBytes));
+         amqMsg.setTargetConsumerId(consumerId);
+      }
+
+      byte[] txIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_TX_ID);
+      if (txIdBytes != null)
+      {
+         TransactionId txId = (TransactionId) marshaller.unmarshal(new ByteSequence(txIdBytes));
+         amqMsg.setTransactionId(txId);
+      }
+
+      String userId = (String) coreMessage.getObjectProperty(AMQ_MSG_USER_ID);
+      if (userId != null)
+      {
+         amqMsg.setUserID(userId);
+      }
+
+      Boolean isCompressed = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
+      if (isCompressed != null)
+      {
+         amqMsg.setCompressed(isCompressed);
+      }
+      Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE);
+      if (isDroppable != null)
+      {
+         amqMsg.setDroppable(isDroppable);
+      }
+
+      SimpleString dlqCause = (SimpleString) coreMessage.getObjectProperty(AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+      if (dlqCause != null)
+      {
+         try
+         {
+            amqMsg.setStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, dlqCause.toString());
+         }
+         catch (JMSException e)
+         {
+            throw new IOException("failure to set dlq property " + dlqCause, e);
+         }
+      }
+      Set<SimpleString> props = coreMessage.getPropertyNames();
+      if (props != null)
+      {
+         for (SimpleString s : props)
+         {
+            String keyStr = s.toString();
+            if (keyStr.startsWith("__HQ") || keyStr.startsWith("__HDR_"))
+            {
+               continue;
+            }
+            Object prop = coreMessage.getObjectProperty(s);
+            try
+            {
+               if (prop instanceof SimpleString)
+               {
+                  amqMsg.setObjectProperty(s.toString(), prop.toString());
+               }
+               else
+               {
+                  amqMsg.setObjectProperty(s.toString(), prop);
+               }
+            }
+            catch (JMSException e)
+            {
+               throw new IOException("exception setting property " + s + " : " + prop, e);
+            }
+         }
+      }
+      return amqMsg;
+   }
+
+}


Mime
View raw message