activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [38/42] activemq-artemis git commit: ARTEMIS-463 Refactoring on Openwire https://issues.apache.org/jira/browse/ARTEMIS-463
Date Mon, 04 Apr 2016 16:09:47 GMT
ARTEMIS-463 Refactoring on Openwire
https://issues.apache.org/jira/browse/ARTEMIS-463

This was a team effort from Clebert Suconic and Howard Gao


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6ddf486f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6ddf486f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6ddf486f

Branch: refs/heads/master
Commit: 6ddf486f8f17776e14cef0c37ca2c9a6a891be7a
Parents: 2e66673
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Fri Apr 1 16:29:55 2016 -0400
Committer: jbertram <jbertram@apache.org>
Committed: Mon Apr 4 11:08:43 2016 -0500

----------------------------------------------------------------------
 .../remoting/impl/netty/NettyConnection.java    |    4 +
 .../protocol/AbstractRemotingConnection.java    |    5 +
 .../artemis/spi/core/remoting/Connection.java   |    6 +
 .../jms/server/impl/JMSServerManagerImpl.java   |   34 +-
 .../plug/ProtonSessionIntegrationCallback.java  |    5 +
 .../core/protocol/mqtt/MQTTSessionCallback.java |    5 +
 .../protocol/openwire/AMQTransactionImpl.java   |   14 +-
 .../protocol/openwire/OpenWireConnection.java   | 1527 +++++++++---------
 .../openwire/OpenWireMessageConverter.java      |   31 +-
 .../openwire/OpenWireProtocolManager.java       |  584 +++----
 .../core/protocol/openwire/OpenWireUtil.java    |   23 +-
 .../core/protocol/openwire/SendingResult.java   |   57 -
 .../amq/AMQCompositeConsumerBrokerExchange.java |    9 +-
 .../openwire/amq/AMQConnectionContext.java      |   19 +
 .../core/protocol/openwire/amq/AMQConsumer.java |  184 ++-
 .../openwire/amq/AMQConsumerBrokerExchange.java |   30 -
 .../core/protocol/openwire/amq/AMQProducer.java |   38 -
 .../openwire/amq/AMQProducerBrokerExchange.java |  145 --
 .../openwire/amq/AMQServerConsumer.java         |   89 +-
 .../protocol/openwire/amq/AMQServerSession.java |   15 +-
 .../openwire/amq/AMQServerSessionFactory.java   |    9 +
 .../core/protocol/openwire/amq/AMQSession.java  |  369 +++--
 .../protocol/openwire/amq/BrowserListener.java  |   22 -
 .../protocol/openwire/util/OpenWireUtil.java    |   83 +
 .../core/protocol/stomp/StompSession.java       |    5 +
 .../artemis/core/config/Configuration.java      |    4 +
 .../core/config/impl/ConfigurationImpl.java     |   19 +
 .../artemis/core/paging/PagingStore.java        |    2 +
 .../protocol/core/impl/CoreSessionCallback.java |    4 +
 .../core/remoting/impl/invm/InVMConnection.java |    6 +
 .../core/remoting/impl/netty/NettyAcceptor.java |    2 +
 .../server/impl/RemotingServiceImpl.java        |    6 +-
 .../artemis/core/server/ActiveMQServer.java     |    4 +
 .../artemis/core/server/ServerConsumer.java     |    6 +
 .../server/SlowConsumerDetectionListener.java   |   22 +
 .../core/server/embedded/EmbeddedActiveMQ.java  |    5 +
 .../core/server/impl/ActiveMQServerImpl.java    |   71 +
 .../artemis/core/server/impl/QueueImpl.java     |    2 +
 .../core/server/impl/ServerConsumerImpl.java    |   37 +-
 .../core/server/impl/ServerSessionImpl.java     |   56 +-
 .../core/settings/impl/AddressSettings.java     |   26 +
 .../spi/core/protocol/SessionCallback.java      |    3 +
 .../artemis/tests/util/ThreadLeakCheckRule.java |   14 +
 .../activemq/JmsRollbackRedeliveryTest.java     |    2 +-
 .../transport/SoWriteTimeoutClientTest.java     |    4 +-
 .../FailoverConsumerOutstandingCommitTest.java  |   24 +-
 .../failover/FailoverTransactionTest.java       |   70 +-
 .../integration/client/HangConsumerTest.java    |   11 +-
 .../integration/openwire/BasicSecurityTest.java |    9 +-
 .../integration/openwire/OpenWireUtilTest.java  |    2 +-
 .../openwire/SimpleOpenWireTest.java            |  572 +------
 .../core/postoffice/impl/BindingsImplTest.java  |   10 -
 .../unit/core/postoffice/impl/FakeQueue.java    |    5 -
 53 files changed, 1770 insertions(+), 2540 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index 9268699..3f10227 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -100,6 +100,10 @@ public class NettyConnection implements Connection {
    }
    // Connection implementation ----------------------------
 
+   @Override
+   public void setAutoRead(boolean autoRead) {
+      channel.config().setAutoRead(autoRead);
+   }
 
    @Override
    public synchronized boolean isWritable(ReadyListener callback) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
index b759ccc..ee2449b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
@@ -104,6 +104,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
       return transportConnection.getID();
    }
 
+
+   public String getLocalAddress() {
+      return transportConnection.getLocalAddress();
+   }
+
    @Override
    public String getRemoteAddress() {
       return transportConnection.getRemoteAddress();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index ed10113..4352d49 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -44,6 +44,12 @@ public interface Connection {
    void fireReady(boolean ready);
 
    /**
+    * This will disable reading from the channel.
+    * This is basically the same as blocking the reading.
+    * */
+   void setAutoRead(boolean autoRead);
+
+   /**
     * returns the unique id of this wire.
     *
     * @return the id

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
index 35f584b..9872d0f 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
@@ -1047,28 +1047,32 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
    private boolean internalCreateQueue(final String queueName,
                                        final String selectorString,
                                        final boolean durable) throws Exception {
-      if (queues.get(queueName) != null) {
-         return false;
-      }
-      else {
-         ActiveMQQueue activeMQQueue = ActiveMQDestination.createQueue(queueName);
+      // TODO: there was an openwire test failng because of this
+      //       is this really needed for FailoverClusterTest ?
+      synchronized (queues) {
+         if (queues.get(queueName) != null) {
+            return false;
+         }
+         else {
+            ActiveMQQueue activeMQQueue = ActiveMQDestination.createQueue(queueName);
 
-         // Convert from JMS selector to core filter
-         String coreFilterString = null;
+            // Convert from JMS selector to core filter
+            String coreFilterString = null;
 
-         if (selectorString != null) {
-            coreFilterString = SelectorTranslator.convertToActiveMQFilterString(selectorString);
-         }
+            if (selectorString != null) {
+               coreFilterString = SelectorTranslator.convertToActiveMQFilterString(selectorString);
+            }
 
-         Queue queue = server.deployQueue(SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(coreFilterString), durable, false);
+            Queue queue = server.deployQueue(SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(coreFilterString), durable, false);
 
-         queues.put(queueName, activeMQQueue);
+            queues.put(queueName, activeMQQueue);
 
-         this.recoverregistryBindings(queueName, PersistedType.Queue);
+            this.recoverregistryBindings(queueName, PersistedType.Queue);
 
-         jmsManagementService.registerQueue(activeMQQueue, queue);
+            jmsManagementService.registerQueue(activeMQQueue, queue);
 
-         return true;
+            return true;
+         }
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 12aad22..5d6af2a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -92,6 +92,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
    }
 
    @Override
+   public void browserFinished(ServerConsumer consumer) {
+
+   }
+
+   @Override
    public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception {
 
       this.protonSession = protonSession;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 356dc73..28d86b8 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -83,6 +83,11 @@ public class MQTTSessionCallback implements SessionCallback {
 
 
    @Override
+   public void browserFinished(ServerConsumer consumer) {
+
+   }
+
+   @Override
    public boolean hasCredits(ServerConsumer consumerID) {
       return true;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java
index e356522..bbd7e95 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java
@@ -28,22 +28,10 @@ public class AMQTransactionImpl extends TransactionImpl {
 
    private boolean rollbackForClose = false;
 
-   public AMQTransactionImpl(StorageManager storageManager, int timeoutSeconds) {
-      super(storageManager, timeoutSeconds);
-   }
-
-   public AMQTransactionImpl(StorageManager storageManager) {
-      super(storageManager);
-   }
-
    public AMQTransactionImpl(Xid xid, StorageManager storageManager, int timeoutSeconds) {
       super(xid, storageManager, timeoutSeconds);
    }
 
-   public AMQTransactionImpl(long id, Xid xid, StorageManager storageManager) {
-      super(id, xid, storageManager);
-   }
-
    @Override
    public RefsOperation createRefsOperation(Queue queue) {
       return new AMQrefsOperation(queue, storageManager);
@@ -55,6 +43,8 @@ public class AMQTransactionImpl extends TransactionImpl {
          super(queue, storageManager);
       }
 
+
+      // This is because the Rollbacks happen through the consumer, not through the server's
       @Override
       public void afterRollback(Transaction tx) {
          if (rollbackForClose) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index c2c535e..e8259c3 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -16,46 +16,54 @@
  */
 package org.apache.activemq.artemis.core.protocol.openwire;
 
+import javax.jms.InvalidClientIDException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSSecurityException;
-import javax.jms.ResourceAllocationException;
+import javax.transaction.xa.XAResource;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerConsumer;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange;
-import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
+import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionControl;
@@ -70,6 +78,7 @@ import org.apache.activemq.command.DataArrayResponse;
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.FlushCommand;
+import org.apache.activemq.command.IntegerResponse;
 import org.apache.activemq.command.KeepAliveInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -79,7 +88,6 @@ import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionId;
@@ -101,47 +109,36 @@ import org.apache.activemq.wireformat.WireFormat;
 /**
  * Represents an activemq connection.
  */
-public class OpenWireConnection implements RemotingConnection, CommandVisitor, SecurityAuth {
+public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth {
 
    private final OpenWireProtocolManager protocolManager;
 
-   private final Connection transportConnection;
-
-   private final long creationTime;
-
-   private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<>();
-
-   private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<>();
-
    private boolean destroyed = false;
 
    private final Object sendLock = new Object();
 
-   private volatile boolean dataReceived;
-
-   private final Acceptor acceptorUsed;
-
-   private OpenWireFormat wireFormat;
+   private final OpenWireFormat wireFormat;
 
    private AMQConnectionContext context;
 
-   private boolean pendingStop;
-
-   private Throwable stopError = null;
-
    private final AtomicBoolean stopping = new AtomicBoolean(false);
 
-   private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
-
-   protected final List<Command> dispatchQueue = new LinkedList<>();
-
    private boolean inServiceException;
 
    private final AtomicBoolean asyncException = new AtomicBoolean(false);
 
+   // Clebert: Artemis session has meta-data support, perhaps we could reuse it here
+   private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>();
+
+
    private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new HashMap<>();
    private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new HashMap<>();
 
+   // Clebert TODO: Artemis already stores the Session. Why do we need a different one here
+   private Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>();
+
+
+
    private ConnectionState state;
 
    private final Set<ActiveMQDestination> tempQueues = new ConcurrentHashSet<>();
@@ -150,20 +147,14 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    private volatile AMQSession advisorySession;
 
-   public OpenWireConnection(Acceptor acceptorUsed,
-                             Connection connection,
+   // TODO-NOW: check on why there are two connections created for every createConnection on the client.
+   public OpenWireConnection(Connection connection,
+                             Executor executor,
                              OpenWireProtocolManager openWireProtocolManager,
                              OpenWireFormat wf) {
+      super(connection, executor);
       this.protocolManager = openWireProtocolManager;
-      this.transportConnection = connection;
-      this.acceptorUsed = acceptorUsed;
       this.wireFormat = wf;
-      this.creationTime = System.currentTimeMillis();
-   }
-
-   @Override
-   public boolean isWritable(ReadyListener callback) {
-      return transportConnection.isWritable(callback);
    }
 
    // SecurityAuth implementation
@@ -178,6 +169,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    // SecurityAuth implementation
    @Override
+   public RemotingConnection getRemotingConnection() {
+      return this;
+   }
+
+   // SecurityAuth implementation
+   @Override
    public String getPassword() {
       ConnectionInfo info = getConnectionInfo();
       if (info == null) {
@@ -186,7 +183,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       return info.getPassword();
    }
 
-
    private ConnectionInfo getConnectionInfo() {
       if (state == null) {
          return null;
@@ -198,19 +194,22 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       return info;
    }
 
-   public String getLocalAddress() {
-      return transportConnection.getLocalAddress();
-   }
-
    @Override
    public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
+      super.bufferReceived(connectionID, buffer);
       try {
-         dataReceived = true;
+
+         // TODO-NOW: set OperationContext
 
          Command command = (Command) wireFormat.unmarshal(buffer);
 
          boolean responseRequired = command.isResponseRequired();
          int commandId = command.getCommandId();
+
+
+         // TODO-NOW: the server should send packets to the client based on the requested times
+         //           need to look at what Andy did on AMQP
+
          // the connection handles pings, negotiations directly.
          // and delegate all other commands to manager.
          if (command.getClass() == KeepAliveInfo.class) {
@@ -218,43 +217,36 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
             info.setResponseRequired(false);
             // if we don't respond to KeepAlive commands then the client will think the server is dead and timeout
             // for some reason KeepAliveInfo.isResponseRequired() is always false
-            protocolManager.sendReply(this, info);
-         }
-         else if (command.getClass() == WireFormatInfo.class) {
-            // amq here starts a read/write monitor thread (detect ttl?)
-            negotiate((WireFormatInfo) command);
+            sendCommand(info);
          }
-         else if (command.getClass() == ConnectionInfo.class || command.getClass() == ConsumerInfo.class || command.getClass() == RemoveInfo.class ||
-                  command.getClass() == SessionInfo.class || command.getClass() == ProducerInfo.class || ActiveMQMessage.class.isAssignableFrom(command.getClass()) ||
-                  command.getClass() == MessageAck.class || command.getClass() == TransactionInfo.class || command.getClass() == DestinationInfo.class ||
-                  command.getClass() == ShutdownInfo.class || command.getClass() == RemoveSubscriptionInfo.class) {
+         else {
             Response response = null;
 
-            if (pendingStop) {
-               response = new ExceptionResponse(this.stopError);
+            try {
+               setLastCommand(command);
+               response = command.visit(commandProcessorInstance);
             }
-            else {
-               try {
-                  response = command.visit(this);
-               }
-               catch (Exception e) {
-                  if (responseRequired) {
-                     response = new ExceptionResponse(e);
-                  }
+            catch (Exception e) {
+               if (responseRequired) {
+                  response = new ExceptionResponse(e);
                }
+            }
+            finally {
+               setLastCommand(null);
+            }
 
-               if (response instanceof ExceptionResponse) {
-                  if (!responseRequired) {
-                     Throwable cause = ((ExceptionResponse) response).getException();
-                     serviceException(cause);
-                     response = null;
-                  }
+            if (response instanceof ExceptionResponse) {
+               if (!responseRequired) {
+                  Throwable cause = ((ExceptionResponse) response).getException();
+                  serviceException(cause);
+                  response = null;
                }
             }
 
             if (responseRequired) {
                if (response == null) {
                   response = new Response();
+                  response.setCorrelationId(command.getCommandId());
                }
             }
 
@@ -267,177 +259,50 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
                }
             }
 
+            // TODO-NOW: response through operation-context
+
             if (response != null && !protocolManager.isStopping()) {
                response.setCorrelationId(commandId);
                dispatchSync(response);
             }
 
          }
-         else {
-            // note!!! wait for negotiation (e.g. use a countdown latch)
-            // before handling any other commands
-            this.protocolManager.handleCommand(this, command);
-         }
-      }
-      catch (IOException e) {
-         ActiveMQServerLogger.LOGGER.error("error decoding", e);
-      }
-      catch (Throwable t) {
-         ActiveMQServerLogger.LOGGER.error("error decoding", t);
-      }
-   }
-
-   private void negotiate(WireFormatInfo command) throws IOException {
-      this.wireFormat.renegotiateWireFormat(command);
-      //throw back a brokerInfo here
-      protocolManager.sendBrokerInfo(this);
-   }
-
-   @Override
-   public Object getID() {
-      return transportConnection.getID();
-   }
-
-   @Override
-   public long getCreationTime() {
-      return creationTime;
-   }
-
-   @Override
-   public String getRemoteAddress() {
-      return transportConnection.getRemoteAddress();
-   }
-
-   @Override
-   public void addFailureListener(FailureListener listener) {
-      if (listener == null) {
-         throw new IllegalStateException("FailureListener cannot be null");
       }
+      catch (Exception e) {
+         ActiveMQServerLogger.LOGGER.debug(e);
 
-      failureListeners.add(listener);
-   }
-
-   @Override
-   public boolean removeFailureListener(FailureListener listener) {
-      if (listener == null) {
-         throw new IllegalStateException("FailureListener cannot be null");
+         sendException(e);
       }
-
-      return failureListeners.remove(listener);
    }
 
-   @Override
-   public void addCloseListener(CloseListener listener) {
-      if (listener == null) {
-         throw new IllegalStateException("CloseListener cannot be null");
+   public void sendException(Exception e) {
+      Response resp;
+      if (e instanceof ActiveMQSecurityException) {
+         resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
       }
-
-      closeListeners.add(listener);
-   }
-
-   @Override
-   public boolean removeCloseListener(CloseListener listener) {
-      if (listener == null) {
-         throw new IllegalStateException("CloseListener cannot be null");
+      else if (e instanceof ActiveMQNonExistentQueueException) {
+         resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage()));
       }
-
-      return closeListeners.remove(listener);
-   }
-
-   @Override
-   public List<CloseListener> removeCloseListeners() {
-      List<CloseListener> ret = new ArrayList<>(closeListeners);
-
-      closeListeners.clear();
-
-      return ret;
-   }
-
-   @Override
-   public void setCloseListeners(List<CloseListener> listeners) {
-      closeListeners.clear();
-
-      closeListeners.addAll(listeners);
-   }
-
-   @Override
-   public List<FailureListener> getFailureListeners() {
-      // we do not return the listeners otherwise the remoting service
-      // would NOT destroy the connection.
-      return Collections.emptyList();
-   }
-
-   @Override
-   public List<FailureListener> removeFailureListeners() {
-      List<FailureListener> ret = new ArrayList<>(failureListeners);
-
-      failureListeners.clear();
-
-      return ret;
-   }
-
-   @Override
-   public void setFailureListeners(List<FailureListener> listeners) {
-      failureListeners.clear();
-
-      failureListeners.addAll(listeners);
-   }
-
-   @Override
-   public ActiveMQBuffer createTransportBuffer(int size) {
-      return ActiveMQBuffers.dynamicBuffer(size);
-   }
-
-   @Override
-   public void fail(ActiveMQException me) {
-      if (me != null) {
-         ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
+      else {
+         resp = new ExceptionResponse(e);
       }
-
-      // Then call the listeners
-      callFailureListeners(me);
-
-      callClosingListeners();
-
-      destroyed = true;
-
-      transportConnection.close();
-   }
-
-   @Override
-   public void destroy() {
-      destroyed = true;
-
-      transportConnection.close();
-
       try {
-         deleteTempQueues();
-      }
-      catch (Exception e) {
-         //log warning
+         dispatch(resp);
       }
-
-      synchronized (sendLock) {
-         callClosingListeners();
+      catch (IOException e2) {
+         ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e2);
       }
    }
 
-   private void deleteTempQueues() throws Exception {
-      Iterator<ActiveMQDestination> tmpQs = tempQueues.iterator();
-      while (tmpQs.hasNext()) {
-         ActiveMQDestination q = tmpQs.next();
-         protocolManager.removeDestination(this, q);
+   private void setLastCommand(Command command) {
+      if (context != null) {
+         context.setLastCommand(command);
       }
    }
 
    @Override
-   public RemotingConnection getRemotingConnection() {
-      return this;
-   }
-
-   @Override
-   public Connection getTransportConnection() {
-      return this.transportConnection;
+   public void destroy() {
+      fail(null, null);
    }
 
    @Override
@@ -452,7 +317,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    @Override
    public void disconnect(boolean criticalError) {
-      fail(null);
+      this.disconnect(null, null, criticalError);
    }
 
    @Override
@@ -484,26 +349,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       }
    }
 
-   private void callClosingListeners() {
-      final List<CloseListener> listenersClone = new ArrayList<>(closeListeners);
-
-      for (final CloseListener listener : listenersClone) {
-         try {
-            listener.connectionClosed();
-         }
-         catch (final Throwable t) {
-            // Failure of one listener to execute shouldn't prevent others
-            // from
-            // executing
-            ActiveMQServerLogger.LOGGER.errorCallingFailureListener(t);
-         }
-      }
-   }
-
-   // throw a WireFormatInfo to the peer
-   public void init() {
+   // send a WireFormatInfo to the peer
+   public void sendHandshake() {
       WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
-      protocolManager.send(this, info);
+      sendCommand(info);
    }
 
    public ConnectionState getState() {
@@ -527,140 +376,21 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    }
 
-   @Override
-   public Response processAddConnection(ConnectionInfo info) throws Exception {
-      WireFormatInfo wireFormatInfo = wireFormat.getPreferedWireFormatInfo();
-      // Older clients should have been defaulting this field to true.. but
-      // they were not.
-      if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
-         info.setClientMaster(true);
-      }
-
-      state = new ConnectionState(info);
-
-      context = new AMQConnectionContext();
-
-      state.reset(info);
-
-      // Setup the context.
-      String clientId = info.getClientId();
-      context.setBroker(protocolManager);
-      context.setClientId(clientId);
-      context.setClientMaster(info.isClientMaster());
-      context.setConnection(this);
-      context.setConnectionId(info.getConnectionId());
-      // for now we pass the manager as the connector and see what happens
-      // it should be related to activemq's Acceptor
-      context.setFaultTolerant(info.isFaultTolerant());
-      context.setUserName(info.getUserName());
-      context.setWireFormatInfo(wireFormatInfo);
-      context.setReconnect(info.isFailoverReconnect());
-      context.setConnectionState(state);
-      if (info.getClientIp() == null) {
-         info.setClientIp(getRemoteAddress());
-      }
-
-      try {
-         protocolManager.addConnection(context, info);
-      }
-      catch (Exception e) {
-         if (e instanceof SecurityException) {
-            // close this down - in case the peer of this transport doesn't play
-            // nice
-            delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
-         }
-         Response resp = new ExceptionResponse(e);
-         return resp;
-      }
-      if (info.isManageable()) {
-         // send ConnectionCommand
-         ConnectionControl command = new ConnectionControl();
-         command.setFaultTolerant(protocolManager.isFaultTolerantConfiguration());
-         if (info.isFailoverReconnect()) {
-            command.setRebalanceConnection(false);
-         }
-         dispatchAsync(command);
-      }
-      return null;
-   }
-
-   public void dispatchAsync(Command message) {
-      if (!stopping.get()) {
-         dispatchSync(message);
-      }
-      else {
-         if (message.isMessageDispatch()) {
-            MessageDispatch md = (MessageDispatch) message;
-            TransmitCallback sub = md.getTransmitCallback();
-            protocolManager.postProcessDispatch(md);
-            if (sub != null) {
-               sub.onFailure();
-            }
-         }
-      }
-   }
-
-   public void dispatchSync(Command message) {
-      try {
-         processDispatch(message);
-      }
-      catch (IOException e) {
-         serviceExceptionAsync(e);
-      }
-   }
-
-   public void serviceExceptionAsync(final IOException e) {
-      if (asyncException.compareAndSet(false, true)) {
-         // Why this is not through an executor?
-         new Thread("Async Exception Handler") {
-            @Override
-            public void run() {
-               serviceException(e);
-            }
-         }.start();
-      }
+   public void dispatchAsync(Command message) throws Exception {
+      dispatchSync(message);
    }
 
-   public void serviceException(Throwable e) {
-      // are we a transport exception such as not being able to dispatch
-      // synchronously to a transport
-      if (e instanceof IOException) {
-         serviceTransportException((IOException) e);
-      }
-      else if (!stopping.get() && !inServiceException) {
-         inServiceException = true;
-         try {
-            ConnectionError ce = new ConnectionError();
-            ce.setException(e);
-            if (pendingStop) {
-               dispatchSync(ce);
-            }
-            else {
-               dispatchAsync(ce);
-            }
-         }
-         finally {
-            inServiceException = false;
-         }
-      }
+   public void dispatchSync(Command message) throws Exception {
+      processDispatch(message);
    }
 
-   public void serviceTransportException(IOException e) {
-      /*
-       * deal with it later BrokerService bService =
-       * connector.getBrokerService(); if (bService.isShutdownOnSlaveFailure())
-       * { if (brokerInfo != null) { if (brokerInfo.isSlaveBroker()) {
-       * LOG.error("Slave has exception: {} shutting down master now.",
-       * e.getMessage(), e); try { doStop(); bService.stop(); } catch (Exception
-       * ex) { LOG.warn("Failed to stop the master", ex); } } } } if
-       * (!stopping.get() && !pendingStop) { transportException.set(e); if
-       * (TRANSPORTLOG.isDebugEnabled()) { TRANSPORTLOG.debug(this + " failed: "
-       * + e, e); } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
-       * TRANSPORTLOG.warn(this + " failed: " + e); } stopAsync(); }
-       */
+   public void serviceException(Throwable e) throws Exception {
+      ConnectionError ce = new ConnectionError();
+      ce.setException(e);
+      dispatchAsync(ce);
    }
 
-   protected void dispatch(Command command) throws IOException {
+   public void dispatch(Command command) throws IOException {
       this.physicalSend(command);
    }
 
@@ -696,564 +426,775 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       }
    }
 
-   public void delayedStop(final int waitTime, final String reason, Throwable cause) {
-      if (waitTime > 0) {
-         synchronized (this) {
-            pendingStop = true;
-            stopError = cause;
+   private void addConsumerBrokerExchange(ConsumerId id,
+                                         AMQSession amqSession,
+                                         List<AMQConsumer> consumerList) {
+      AMQConsumerBrokerExchange result = consumerExchanges.get(id);
+      if (result == null) {
+         if (consumerList.size() == 1) {
+            result = new AMQSingleConsumerBrokerExchange(amqSession, consumerList.get(0));
          }
-      }
-   }
-
-   public void stopAsync() {
-      // If we're in the middle of starting then go no further... for now.
-      synchronized (this) {
-         pendingStop = true;
-      }
-      if (stopping.compareAndSet(false, true)) {
-         if (context != null) {
-            context.getStopping().set(true);
+         else {
+            result = new AMQCompositeConsumerBrokerExchange(amqSession, consumerList);
+         }
+         synchronized (consumerExchanges) {
+            consumerExchanges.put(id, result);
          }
       }
    }
 
-   protected void doStop() throws Exception {
-      /*
-       * What's a duplex bridge? try { synchronized (this) { if (duplexBridge !=
-       * null) { duplexBridge.stop(); } } } catch (Exception ignore) {
-       * LOG.trace("Exception caught stopping. This exception is ignored.",
-       * ignore); }
-       */
-      try {
-         getTransportConnection().close();
-      }
-      catch (Exception e) {
-         // log
-      }
-
-      // Run the MessageDispatch callbacks so that message references get
-      // cleaned up.
-      synchronized (dispatchQueue) {
-         for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
-            Command command = iter.next();
-            if (command.isMessageDispatch()) {
-               MessageDispatch md = (MessageDispatch) command;
-               TransmitCallback sub = md.getTransmitCallback();
-               protocolManager.postProcessDispatch(md);
-               if (sub != null) {
-                  sub.onFailure();
+   private AMQProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
+      AMQProducerBrokerExchange result = producerExchanges.get(id);
+      if (result == null) {
+         synchronized (producerExchanges) {
+            result = new AMQProducerBrokerExchange();
+            result.setConnectionContext(context);
+            //todo implement reconnect https://issues.apache.org/jira/browse/ARTEMIS-194
+            //todo: this used to check for  && this.acceptorUsed.isAuditNetworkProducers()
+            if (context.isReconnect() || (context.isNetworkConnection())) {
+               // once implemented ARTEMIS-194, we need to set the storedSequenceID here somehow
+               // We have different semantics on Artemis Journal, but we could adapt something for this
+               // TBD during the implemetnation of ARTEMIS-194
+               result.setLastStoredSequenceId(0);
+            }
+            SessionState ss = state.getSessionState(id.getParentId());
+            if (ss != null) {
+               result.setProducerState(ss.getProducerState(id));
+               ProducerState producerState = ss.getProducerState(id);
+               if (producerState != null && producerState.getInfo() != null) {
+                  ProducerInfo info = producerState.getInfo();
                }
             }
-         }
-         dispatchQueue.clear();
-      }
-      //
-      // Remove all logical connection associated with this connection
-      // from the broker.
-      if (!protocolManager.isStopped()) {
-         context.getStopping().set(true);
-         try {
-            processRemoveConnection(state.getInfo().getConnectionId(), 0L);
-         }
-         catch (Throwable ignore) {
-            ignore.printStackTrace();
+            producerExchanges.put(id, result);
          }
       }
+      return result;
    }
 
-   @Override
-   public Response processAddConsumer(ConsumerInfo info) {
-      Response resp = null;
-      try {
-         protocolManager.addConsumer(this, info);
-      }
-      catch (Exception e) {
-         if (e instanceof ActiveMQSecurityException) {
-            resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
-         }
-         else {
-            resp = new ExceptionResponse(e);
-         }
+   private void removeConsumerBrokerExchange(ConsumerId id) {
+      synchronized (consumerExchanges) {
+         consumerExchanges.remove(id);
       }
-      return resp;
    }
 
-   public void addConsumerBrokerExchange(ConsumerId id,
-                                         AMQSession amqSession,
-                                         Map<ActiveMQDestination, AMQConsumer> consumerMap) {
-      AMQConsumerBrokerExchange result = consumerExchanges.get(id);
-      if (result == null) {
-         if (consumerMap.size() == 1) {
-            result = new AMQSingleConsumerBrokerExchange(amqSession, consumerMap.values().iterator().next());
-         }
-         else {
-            result = new AMQCompositeConsumerBrokerExchange(amqSession, consumerMap);
-         }
-         synchronized (consumerExchanges) {
-            result.setConnectionContext(context);
-            SessionState ss = state.getSessionState(id.getParentId());
-            if (ss != null) {
-               ConsumerState cs = ss.getConsumerState(id);
-               if (cs != null) {
-                  ConsumerInfo info = cs.getInfo();
-                  if (info != null) {
-                     if (info.getDestination() != null && info.getDestination().isPattern()) {
-                        result.setWildcard(true);
-                     }
-                  }
-               }
-            }
-            consumerExchanges.put(id, result);
-         }
+   public void deliverMessage(MessageDispatch dispatch) {
+      Message m = dispatch.getMessage();
+      if (m != null) {
+         long endTime = System.currentTimeMillis();
+         m.setBrokerOutTime(endTime);
       }
+
+      sendCommand(dispatch);
    }
 
-   public int getConsumerCount() {
-      int result = 0;
-      for (SessionId sessionId : state.getSessionIds()) {
-         SessionState sessionState = state.getSessionState(sessionId);
-         if (sessionState != null) {
-            result += sessionState.getConsumerIds().size();
-         }
-      }
-      return result;
+   public WireFormat getMarshaller() {
+      return this.wireFormat;
    }
 
-   public int getProducerCount() {
-      int result = 0;
-      for (SessionId sessionId : state.getSessionIds()) {
-         SessionState sessionState = state.getSessionState(sessionId);
-         if (sessionState != null) {
-            result += sessionState.getProducerIds().size();
-         }
-      }
-      return result;
+   public void registerTempQueue(ActiveMQDestination queue) {
+      tempQueues.add(queue);
    }
 
-   @Override
-   public Response processAddDestination(DestinationInfo dest) throws Exception {
-      Response resp = null;
-      try {
-         protocolManager.addDestination(this, dest);
+   private void shutdown(boolean fail) {
+      if (fail) {
+         transportConnection.forceClose();
       }
-      catch (Exception e) {
-         if (e instanceof ActiveMQSecurityException) {
-            resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
-         }
-         else {
-            resp = new ExceptionResponse(e);
-         }
+      else {
+         transportConnection.close();
       }
-      return resp;
    }
 
-   @Override
-   public Response processAddProducer(ProducerInfo info) throws Exception {
-      Response resp = null;
-      try {
-         protocolManager.addProducer(this, info);
+   private void disconnect(ActiveMQException me, String reason, boolean fail)  {
+
+      if (context == null || destroyed) {
+         return;
       }
-      catch (Exception e) {
-         if (e instanceof ActiveMQSecurityException) {
-            resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
-         }
-         else if (e instanceof ActiveMQNonExistentQueueException) {
-            resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage()));
-         }
-         else {
-            resp = new ExceptionResponse(e);
-         }
+      // Don't allow things to be added to the connection state while we
+      // are shutting down.
+      // is it necessary? even, do we need state at all?
+      state.shutdown();
+
+      // Then call the listeners
+      // this should closes underlying sessions
+      callFailureListeners(me);
+
+      // this should clean up temp dests
+      synchronized (sendLock) {
+         callClosingListeners();
       }
-      return resp;
-   }
 
-   @Override
-   public Response processAddSession(SessionInfo info) throws Exception {
-      // Avoid replaying dup commands
-      if (!state.getSessionIds().contains(info.getSessionId())) {
-         protocolManager.addSession(this, info);
+      destroyed = true;
+
+      //before closing transport, sendCommand the last response if any
+      Command command = context.getLastCommand();
+      if (command != null && command.isResponseRequired()) {
+         Response lastResponse = new Response();
+         lastResponse.setCorrelationId(command.getCommandId());
          try {
-            state.addSession(info);
+            dispatchSync(lastResponse);
          }
-         catch (IllegalStateException e) {
-            e.printStackTrace();
-            protocolManager.removeSession(context, info);
+         catch (Throwable e) {
+            ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
          }
       }
-      return null;
    }
 
    @Override
-   public Response processBeginTransaction(TransactionInfo info) throws Exception {
-      TransactionId txId = info.getTransactionId();
-
-      if (!txMap.containsKey(txId)) {
-         txMap.put(txId, info);
-      }
-      return null;
+   public void disconnect(String reason, boolean fail) {
+      this.disconnect(null, reason, fail);
    }
 
    @Override
-   public Response processBrokerInfo(BrokerInfo arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
+   public void fail(ActiveMQException me, String message) {
+      if (me != null) {
+         ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
+      }
+      try {
+         protocolManager.removeConnection(this.getConnectionInfo(), me);
+      }
+      catch (InvalidClientIDException e) {
+         ActiveMQServerLogger.LOGGER.warn("Couldn't close connection because invalid clientID", e);
+      }
+      shutdown(true);
    }
 
-   @Override
-   public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
-      protocolManager.commitTransactionOnePhase(info);
-      TransactionId txId = info.getTransactionId();
-      txMap.remove(txId);
-
-      return null;
+   public void setAdvisorySession(AMQSession amqSession) {
+      this.advisorySession = amqSession;
    }
 
-   @Override
-   public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
-      protocolManager.commitTransactionTwoPhase(info);
-      TransactionId txId = info.getTransactionId();
-      txMap.remove(txId);
-
-      return null;
+   public AMQSession getAdvisorySession() {
+      return this.advisorySession;
    }
 
-   @Override
-   public Response processConnectionControl(ConnectionControl arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
+   public AMQConnectionContext getContext() {
+      return this.context;
    }
 
-   @Override
-   public Response processConnectionError(ConnectionError arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
+   public void updateClient(ConnectionControl control) throws Exception {
+      if (protocolManager.isUpdateClusterClients()) {
+         dispatchAsync(control);
+      }
    }
 
-   @Override
-   public Response processConsumerControl(ConsumerControl arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
-   }
+   public AMQConnectionContext initContext(ConnectionInfo info) {
+      WireFormatInfo wireFormatInfo = wireFormat.getPreferedWireFormatInfo();
+      // Older clients should have been defaulting this field to true.. but
+      // they were not.
+      if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
+         info.setClientMaster(true);
+      }
 
-   @Override
-   public Response processControlCommand(ControlCommand arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
-   }
+      state = new ConnectionState(info);
 
-   @Override
-   public Response processEndTransaction(TransactionInfo info) throws Exception {
-      protocolManager.endTransaction(info);
-      TransactionId txId = info.getTransactionId();
+      context = new AMQConnectionContext();
 
-      if (!txMap.containsKey(txId)) {
-         txMap.put(txId, info);
+      state.reset(info);
+
+      // Setup the context.
+      String clientId = info.getClientId();
+      context.setBroker(protocolManager);
+      context.setClientId(clientId);
+      context.setClientMaster(info.isClientMaster());
+      context.setConnection(this);
+      context.setConnectionId(info.getConnectionId());
+      // for now we pass the manager as the connector and see what happens
+      // it should be related to activemq's Acceptor
+      context.setFaultTolerant(info.isFaultTolerant());
+      context.setUserName(info.getUserName());
+      context.setWireFormatInfo(wireFormatInfo);
+      context.setReconnect(info.isFailoverReconnect());
+      context.setConnectionState(state);
+      if (info.getClientIp() == null) {
+         info.setClientIp(getRemoteAddress());
       }
-      return null;
-   }
 
-   @Override
-   public Response processFlush(FlushCommand arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
+      return context;
    }
 
-   @Override
-   public Response processForgetTransaction(TransactionInfo info) throws Exception {
-      TransactionId txId = info.getTransactionId();
-      txMap.remove(txId);
+   //raise the refCount of context
+   public void reconnect(AMQConnectionContext existingContext, ConnectionInfo info) {
+      this.context = existingContext;
+      WireFormatInfo wireFormatInfo = wireFormat.getPreferedWireFormatInfo();
+      // Older clients should have been defaulting this field to true.. but
+      // they were not.
+      if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
+         info.setClientMaster(true);
+      }
+      if (info.getClientIp() == null) {
+         info.setClientIp(getRemoteAddress());
+      }
 
-      protocolManager.forgetTransaction(info.getTransactionId());
-      return null;
-   }
+      state = new ConnectionState(info);
+      state.reset(info);
 
-   @Override
-   public Response processKeepAlive(KeepAliveInfo arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
+      context.setConnection(this);
+      context.setConnectionState(state);
+      context.setClientMaster(info.isClientMaster());
+      context.setFaultTolerant(info.isFaultTolerant());
+      context.setReconnect(true);
+      context.incRefCount();
    }
 
-   @Override
-   public Response processMessage(Message messageSend) {
-      Response resp = null;
+   /**
+    * This will answer with commands to the client
+    */
+   public boolean sendCommand(final Command command) {
+      if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
+         ActiveMQServerLogger.LOGGER.trace("sending " + command);
+      }
+
+      if (isDestroyed()) {
+         return false;
+      }
+
       try {
-         ProducerId producerId = messageSend.getProducerId();
-         AMQProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
-         final AMQConnectionContext pcontext = producerExchange.getConnectionContext();
-         final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
-         boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0 && !pcontext.isInRecoveryMode();
+         physicalSend(command);
+      }
+      catch (Exception e) {
+         return false;
+      }
+      catch (Throwable t) {
+         return false;
+      }
+      return true;
+   }
 
-         AMQSession session = protocolManager.getSession(producerId.getParentId());
+   public void addDestination(DestinationInfo info) throws Exception {
+      ActiveMQDestination dest = info.getDestination();
+      if (dest.isQueue()) {
+         SimpleString qName = OpenWireUtil.toCoreAddress(dest);
+         QueueBinding binding = (QueueBinding) protocolManager.getServer().getPostOffice().getBinding(qName);
+         if (binding == null) {
+            if (getState().getInfo() != null) {
 
-         if (producerExchange.canDispatch(messageSend)) {
-            SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
-            if (result.isBlockNextSend()) {
-               if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) {
-                  throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
-               }
+               CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
+               protocolManager.getServer().getSecurityStore().check(qName, checkType, this);
 
-               if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) {
-                  //in that case don't send the response
-                  //this will force the client to wait until
-                  //the response is got.
-                  context.setDontSendReponse(true);
-               }
-               else {
-                  //hang the connection until the space is available
-                  session.blockingWaitForSpace(producerExchange, result);
-               }
-            }
-            else if (sendProducerAck) {
-               ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
-               this.dispatchAsync(ack);
+               protocolManager.getServer().checkQueueCreationLimit(getUsername());
             }
+            ConnectionInfo connInfo = getState().getInfo();
+            protocolManager.getServer().createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary());
          }
-      }
-      catch (Throwable e) {
-         if (e instanceof ActiveMQSecurityException) {
-            resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+
+         if (dest.isTemporary()) {
+            registerTempQueue(dest);
          }
-         else {
-            resp = new ExceptionResponse(e);
+      }
+
+      if (!AdvisorySupport.isAdvisoryTopic(dest)) {
+         AMQConnectionContext context = getContext();
+         DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, dest);
+
+         ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
+         protocolManager.fireAdvisory(context, topic, advInfo);
+      }
+   }
+
+
+   public void updateConsumer(ConsumerControl consumerControl) {
+      SessionId sessionId = consumerControl.getConsumerId().getParentId();
+      AMQSession amqSession = sessions.get(sessionId);
+      amqSession.updateConsumerPrefetchSize(consumerControl.getConsumerId(), consumerControl.getPrefetch());
+   }
+
+   public void addConsumer(ConsumerInfo info) throws Exception {
+      // Todo: add a destination interceptors holder here (amq supports this)
+      SessionId sessionId = info.getConsumerId().getParentId();
+      ConnectionId connectionId = sessionId.getParentId();
+      ConnectionState cs = getState();
+      if (cs == null) {
+         throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " + connectionId);
+      }
+      SessionState ss = cs.getSessionState(sessionId);
+      if (ss == null) {
+         throw new IllegalStateException(protocolManager.getServer() + " Cannot add a consumer to a session that had not been registered: " + sessionId);
+      }
+      // Avoid replaying dup commands
+      if (!ss.getConsumerIds().contains(info.getConsumerId())) {
+
+         AMQSession amqSession = sessions.get(sessionId);
+         if (amqSession == null) {
+            throw new IllegalStateException("Session not exist! : " + sessionId);
          }
+
+         List<AMQConsumer> consumersList = amqSession.createConsumer(info, amqSession, new SlowConsumerDetection());
+
+         this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList);
+         ss.addConsumer(info);
+         amqSession.start();
       }
-      return resp;
    }
 
-   private AMQProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
-      AMQProducerBrokerExchange result = producerExchanges.get(id);
-      if (result == null) {
-         synchronized (producerExchanges) {
-            result = new AMQProducerBrokerExchange();
-            result.setConnectionContext(context);
-            //todo implement reconnect https://issues.apache.org/jira/browse/ARTEMIS-194
-            //todo: this used to check for  && this.acceptorUsed.isAuditNetworkProducers()
-            if (context.isReconnect() || (context.isNetworkConnection())) {
-               // once implemented ARTEMIS-194, we need to set the storedSequenceID here somehow
-               // We have different semantics on Artemis Journal, but we could adapt something for this
-               // TBD during the implemetnation of ARTEMIS-194
-               result.setLastStoredSequenceId(0);
+   class SlowConsumerDetection implements SlowConsumerDetectionListener {
+
+      @Override
+      public void onSlowConsumer(ServerConsumer consumer) {
+         if (consumer instanceof AMQServerConsumer) {
+            AMQServerConsumer serverConsumer = (AMQServerConsumer)consumer;
+            ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(serverConsumer.getAmqConsumer().getOpenwireDestination());
+            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+            try {
+               advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, serverConsumer.getAmqConsumer().getId().toString());
+               protocolManager.fireAdvisory(context, topic, advisoryMessage, serverConsumer.getAmqConsumer().getId());
             }
-            SessionState ss = state.getSessionState(id.getParentId());
-            if (ss != null) {
-               result.setProducerState(ss.getProducerState(id));
-               ProducerState producerState = ss.getProducerState(id);
-               if (producerState != null && producerState.getInfo() != null) {
-                  ProducerInfo info = producerState.getInfo();
-                  result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
-               }
+            catch (Exception e) {
+               // TODO-NOW: LOGGING
+               e.printStackTrace();
             }
-            producerExchanges.put(id, result);
          }
       }
-      return result;
    }
 
-   @Override
-   public Response processMessageAck(MessageAck ack) throws Exception {
-      AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId());
-      consumerBrokerExchange.acknowledge(ack);
-      return null;
+   public void addSessions(Set<SessionId> sessionSet) {
+      Iterator<SessionId> iter = sessionSet.iterator();
+      while (iter.hasNext()) {
+         SessionId sid = iter.next();
+         addSession(getState().getSessionState(sid).getInfo(), true);
+      }
    }
 
-   @Override
-   public Response processMessageDispatch(MessageDispatch arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
+   public AMQSession addSession(SessionInfo ss) {
+      return addSession(ss, false);
    }
 
-   @Override
-   public Response processMessageDispatchNotification(MessageDispatchNotification arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
+   public AMQSession addSession(SessionInfo ss, boolean internal) {
+      AMQSession amqSession = new AMQSession(getState().getInfo(), ss, protocolManager.getServer(), this, protocolManager.getScheduledPool(), protocolManager);
+      amqSession.initialize();
+      amqSession.setInternal(internal);
+      sessions.put(ss.getSessionId(), amqSession);
+      sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId());
+      return amqSession;
    }
 
-   @Override
-   public Response processMessagePull(MessagePull arg0) throws Exception {
-      AMQConsumerBrokerExchange amqConsumerBrokerExchange = consumerExchanges.get(arg0.getConsumerId());
-      if (amqConsumerBrokerExchange == null) {
-         throw new IllegalStateException("Consumer does not exist");
+   public void removeSession(AMQConnectionContext context, SessionInfo info) throws Exception {
+      AMQSession session = sessions.remove(info.getSessionId());
+      if (session != null) {
+         session.close();
       }
-      amqConsumerBrokerExchange.processMessagePull(arg0);
-      return null;
    }
 
-   @Override
-   public Response processPrepareTransaction(TransactionInfo info) throws Exception {
-      protocolManager.prepareTransaction(info);
-      return null;
+   public AMQSession getSession(SessionId sessionId) {
+      return sessions.get(sessionId);
    }
 
-   @Override
-   public Response processProducerAck(ProducerAck arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
+   public void removeDestination(ActiveMQDestination dest) throws Exception {
+      if (dest.isQueue()) {
+         SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName());
+         protocolManager.getServer().destroyQueue(qName);
+      }
+      else {
+         Bindings bindings = protocolManager.getServer().getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName()));
+         Iterator<Binding> iterator = bindings.getBindings().iterator();
+
+         while (iterator.hasNext()) {
+            Queue b = (Queue) iterator.next().getBindable();
+            if (b.getConsumerCount() > 0) {
+               throw new Exception("Destination still has an active subscription: " + dest.getPhysicalName());
+            }
+            if (b.isDurable()) {
+               throw new Exception("Destination still has durable subscription: " + dest.getPhysicalName());
+            }
+            b.deleteQueue();
+         }
+      }
+
+      if (!AdvisorySupport.isAdvisoryTopic(dest)) {
+         AMQConnectionContext context = getContext();
+         DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.REMOVE_OPERATION_TYPE, dest);
+
+         ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
+         protocolManager.fireAdvisory(context, topic, advInfo);
+      }
    }
 
-   @Override
-   public Response processRecoverTransactions(TransactionInfo info) throws Exception {
-      Set<SessionId> sIds = state.getSessionIds();
-      TransactionId[] recovered = protocolManager.recoverTransactions(sIds);
-      return new DataArrayResponse(recovered);
+   /**
+    * Checks to see if this destination exists.  If it does not throw an invalid destination exception.
+    *
+    * @param destination
+    */
+   private void validateDestination(ActiveMQDestination destination) throws Exception {
+      if (destination.isQueue()) {
+         SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
+         BindingQueryResult result = protocolManager.getServer().bindingQuery(physicalName);
+         if (!result.isExists() && !result.isAutoCreateJmsQueues()) {
+            throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
+         }
+      }
    }
 
-   @Override
-   public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
-      // Don't allow things to be added to the connection state while we
-      // are shutting down.
-      state.shutdown();
-      // Cascade the connection stop to the sessions.
-      for (SessionId sessionId : state.getSessionIds()) {
+
+   CommandProcessor commandProcessorInstance = new CommandProcessor();
+
+
+   // This will listen for commands throught the protocolmanager
+   public class CommandProcessor implements CommandVisitor {
+
+      public AMQConnectionContext getContext() {
+         return OpenWireConnection.this.getContext();
+      }
+
+      @Override
+      public Response processAddConnection(ConnectionInfo info) throws Exception {
          try {
-            processRemoveSession(sessionId, lastDeliveredSequenceId);
+            protocolManager.addConnection(OpenWireConnection.this, info);
          }
-         catch (Throwable e) {
-            // LOG
+         catch (Exception e) {
+            Response resp = new ExceptionResponse(e);
+            return resp;
+         }
+         if (info.isManageable() && protocolManager.isUpdateClusterClients()) {
+            // send ConnectionCommand
+            ConnectionControl command = protocolManager.newConnectionControl();
+            command.setFaultTolerant(protocolManager.isFaultTolerantConfiguration());
+            if (info.isFailoverReconnect()) {
+               command.setRebalanceConnection(false);
+            }
+            dispatchAsync(command);
          }
+         return null;
+
       }
 
-      try {
-         protocolManager.removeConnection(context, state.getInfo(), null);
+      @Override
+      public Response processAddProducer(ProducerInfo info) throws Exception {
+         SessionId sessionId = info.getProducerId().getParentId();
+         ConnectionState cs = getState();
+
+         if (cs == null) {
+            throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + sessionId.getParentId());
+         }
+
+         SessionState ss = cs.getSessionState(sessionId);
+         if (ss == null) {
+            throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
+         }
+
+         // Avoid replaying dup commands
+         if (!ss.getProducerIds().contains(info.getProducerId())) {
+            ActiveMQDestination destination = info.getDestination();
+
+            if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
+               if (destination.isQueue()) {
+                  OpenWireConnection.this.validateDestination(destination);
+               }
+               DestinationInfo destInfo = new DestinationInfo(getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
+               OpenWireConnection.this.addDestination(destInfo);
+            }
+
+            ss.addProducer(info);
+
+         }
+         return null;
       }
-      catch (Throwable e) {
-         // log
+
+      @Override
+      public Response processAddConsumer(ConsumerInfo info) throws Exception {
+         addConsumer(info);
+         return null;
       }
-      return null;
-   }
 
-   @Override
-   public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
-      SessionId sessionId = id.getParentId();
-      SessionState ss = state.getSessionState(sessionId);
-      if (ss == null) {
-         throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " + sessionId);
+      @Override
+      public Response processRemoveDestination(DestinationInfo info) throws Exception {
+         ActiveMQDestination dest = info.getDestination();
+         removeDestination(dest);
+         return null;
+      }
+
+      @Override
+      public Response processRemoveProducer(ProducerId id) throws Exception {
+
+         // TODO-now: proper implement this method
+         return null;
       }
-      ConsumerState consumerState = ss.removeConsumer(id);
-      if (consumerState == null) {
-         throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
+
+      @Override
+      public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
+         SessionState session = state.getSessionState(id);
+         if (session == null) {
+            throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
+         }
+         // Don't let new consumers or producers get added while we are closing
+         // this down.
+         session.shutdown();
+         // Cascade the connection stop producers.
+         // we don't stop consumer because in core
+         // closing the session will do the job
+         for (ProducerId producerId : session.getProducerIds()) {
+            try {
+               processRemoveProducer(producerId);
+            }
+            catch (Throwable e) {
+               // LOG.warn("Failed to remove producer: {}", producerId, e);
+            }
+         }
+         state.removeSession(id);
+         removeSession(context, session.getInfo());
+         return null;
       }
-      ConsumerInfo info = consumerState.getInfo();
-      info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
 
-      AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(id);
+      @Override
+      public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
+         protocolManager.removeSubscription(subInfo);
+         return null;
+      }
 
-      consumerBrokerExchange.removeConsumer();
+      @Override
+      public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+         protocolManager.rollbackTransaction(info);
+         TransactionId txId = info.getTransactionId();
+         txMap.remove(txId);
+         return null;
+      }
 
-      removeConsumerBrokerExchange(id);
+      @Override
+      public Response processShutdown(ShutdownInfo info) throws Exception {
+         OpenWireConnection.this.shutdown(false);
+         return null;
+      }
 
-      return null;
-   }
+      @Override
+      public Response processWireFormat(WireFormatInfo command) throws Exception {
+         wireFormat.renegotiateWireFormat(command);
+         //throw back a brokerInfo here
+         protocolManager.sendBrokerInfo(OpenWireConnection.this);
+         return null;
+      }
 
-   private void removeConsumerBrokerExchange(ConsumerId id) {
-      synchronized (consumerExchanges) {
-         consumerExchanges.remove(id);
+      @Override
+      public Response processAddDestination(DestinationInfo dest) throws Exception {
+         Response resp = null;
+         try {
+            addDestination(dest);
+         }
+         catch (Exception e) {
+            if (e instanceof ActiveMQSecurityException) {
+               resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+            }
+            else {
+               resp = new ExceptionResponse(e);
+            }
+         }
+         return resp;
       }
-   }
 
-   @Override
-   public Response processRemoveDestination(DestinationInfo info) throws Exception {
-      ActiveMQDestination dest = info.getDestination();
-      protocolManager.removeDestination(this, dest);
-      return null;
-   }
+      @Override
+      public Response processAddSession(SessionInfo info) throws Exception {
+         // Avoid replaying dup commands
+         if (!state.getSessionIds().contains(info.getSessionId())) {
+            addSession(info);
+            state.addSession(info);
+         }
+         return null;
+      }
 
-   @Override
-   public Response processRemoveProducer(ProducerId id) throws Exception {
-      protocolManager.removeProducer(id);
-      return null;
-   }
+      @Override
+      public Response processBeginTransaction(TransactionInfo info) throws Exception {
+         TransactionId txId = info.getTransactionId();
 
-   @Override
-   public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
-      SessionState session = state.getSessionState(id);
-      if (session == null) {
-         throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
-      }
-      // Don't let new consumers or producers get added while we are closing
-      // this down.
-      session.shutdown();
-      // Cascade the connection stop to the consumers and producers.
-      for (ConsumerId consumerId : session.getConsumerIds()) {
+         if (!txMap.containsKey(txId)) {
+            txMap.put(txId, info);
+         }
+         return null;
+      }
+
+      @Override
+      public Response processBrokerInfo(BrokerInfo arg0) throws Exception {
+         throw new IllegalStateException("not implemented! ");
+      }
+
+      @Override
+      public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
          try {
-            processRemoveConsumer(consumerId, lastDeliveredSequenceId);
+            protocolManager.commitTransactionOnePhase(info);
+            TransactionId txId = info.getTransactionId();
+            txMap.remove(txId);
          }
-         catch (Throwable e) {
-            // LOG.warn("Failed to remove consumer: {}", consumerId, e);
+         catch (Exception e) {
+            e.printStackTrace();
+            throw e;
          }
+
+         return null;
       }
-      for (ProducerId producerId : session.getProducerIds()) {
+
+      @Override
+      public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+         protocolManager.commitTransactionTwoPhase(info);
+         TransactionId txId = info.getTransactionId();
+         txMap.remove(txId);
+
+         return null;
+      }
+
+      @Override
+      public Response processConnectionControl(ConnectionControl connectionControl) throws Exception {
+         //activemq5 keeps a var to remember only the faultTolerant flag
+         //this can be sent over a reconnected transport as the first command
+         //before restoring the connection.
+         return null;
+      }
+
+      @Override
+      public Response processConnectionError(ConnectionError arg0) throws Exception {
+         throw new IllegalStateException("not implemented! ");
+      }
+
+      @Override
+      public Response processConsumerControl(ConsumerControl consumerControl) throws Exception {
+         //amq5 clients send this command to restore prefetchSize
+         //after successful reconnect
          try {
-            processRemoveProducer(producerId);
+            updateConsumer(consumerControl);
          }
-         catch (Throwable e) {
-            // LOG.warn("Failed to remove producer: {}", producerId, e);
+         catch (Exception e) {
+            //log error
          }
+         return null;
       }
-      state.removeSession(id);
-      protocolManager.removeSession(context, session.getInfo());
-      return null;
-   }
 
-   @Override
-   public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
-      protocolManager.removeSubscription(subInfo);
-      return null;
-   }
+      @Override
+      public Response processControlCommand(ControlCommand arg0) throws Exception {
+         throw new IllegalStateException("not implemented! ");
+      }
 
-   @Override
-   public Response processRollbackTransaction(TransactionInfo info) throws Exception {
-      protocolManager.rollbackTransaction(info);
-      TransactionId txId = info.getTransactionId();
-      txMap.remove(txId);
-      return null;
-   }
+      @Override
+      public Response processEndTransaction(TransactionInfo info) throws Exception {
+         protocolManager.endTransaction(info);
+         TransactionId txId = info.getTransactionId();
 
-   @Override
-   public Response processShutdown(ShutdownInfo info) throws Exception {
-      return null;
-   }
+         if (!txMap.containsKey(txId)) {
+            txMap.put(txId, info);
+         }
+         return null;
+      }
 
-   @Override
-   public Response processWireFormat(WireFormatInfo arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
-   }
+      @Override
+      public Response processFlush(FlushCommand arg0) throws Exception {
+         throw new IllegalStateException("not implemented! ");
+      }
 
-   public int getMaximumConsumersAllowedPerConnection() {
-      return 1000000;//this belongs to configuration, now hardcoded
-   }
+      @Override
+      public Response processForgetTransaction(TransactionInfo info) throws Exception {
+         TransactionId txId = info.getTransactionId();
+         txMap.remove(txId);
 
-   public int getMaximumProducersAllowedPerConnection() {
-      return 1000000;//this belongs to configuration, now hardcoded
-   }
+         protocolManager.forgetTransaction(info.getTransactionId());
+         return null;
+      }
 
-   public void deliverMessage(MessageDispatch dispatch) {
-      Message m = dispatch.getMessage();
-      if (m != null) {
-         long endTime = System.currentTimeMillis();
-         m.setBrokerOutTime(endTime);
+      @Override
+      public Response processKeepAlive(KeepAliveInfo arg0) throws Exception {
+         throw new IllegalStateException("not implemented! ");
       }
 
-      protocolManager.send(this, dispatch);
-   }
+      @Override
+      public Response processMessage(Message messageSend) throws Exception {
+         ProducerId producerId = messageSend.getProducerId();
+         AMQProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
+         final AMQConnectionContext pcontext = producerExchange.getConnectionContext();
+         final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
+         boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0 && !pcontext.isInRecoveryMode();
 
-   public WireFormat getMarshaller() {
-      return this.wireFormat;
-   }
+         AMQSession session = getSession(producerId.getParentId());
 
-   public void registerTempQueue(ActiveMQDestination queue) {
-      tempQueues.add(queue);
-   }
+         session.send(producerInfo, messageSend, sendProducerAck);
+         return null;
+      }
 
-   @Override
-   public void disconnect(String reason, boolean fail) {
-      destroy();
-   }
 
-   @Override
-   public void fail(ActiveMQException e, String message) {
-      destroy();
-   }
+      @Override
+      public Response processMessageAck(MessageAck ack) throws Exception {
+         AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId());
+         consumerBrokerExchange.acknowledge(ack);
+         return null;
+      }
 
-   public void setAdvisorySession(AMQSession amqSession) {
-      this.advisorySession = amqSession;
-   }
+      @Override
+      public Response processMessageDispatch(MessageDispatch arg0) throws Exception {
+         return null;
+      }
 
-   public AMQSession getAdvisorySession() {
-      return this.advisorySession;
-   }
+      @Override
+      public Response processMessageDispatchNotification(MessageDispatchNotification arg0) throws Exception {
+         return null;
+      }
+
+      @Override
+      public Response processMessagePull(MessagePull arg0) throws Exception {
+         AMQConsumerBrokerExchange amqConsumerBrokerExchange = consumerExchanges.get(arg0.getConsumerId());
+         if (amqConsumerBrokerExchange == null) {
+            throw new IllegalStateException("Consumer does not exist");
+         }
+         amqConsumerBrokerExchange.processMessagePull(arg0);
+         return null;
+      }
+
+      @Override
+      public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+         protocolManager.prepareTransaction(info);
+         //activemq needs a rdonly response
+         return new IntegerResponse(XAResource.XA_RDONLY);
+      }
+
+      @Override
+      public Response processProducerAck(ProducerAck arg0) throws Exception {
+         // a broker doesn't do producers.. this shouldn't happen
+         return null;
+      }
+
+      @Override
+      public Response processRecoverTransactions(TransactionInfo info) throws Exception {
+         Set<SessionId> sIds = state.getSessionIds();
+
+
+         List<TransactionId> recovered = new ArrayList<>();
+         if (sIds != null) {
+            for (SessionId sid : sIds) {
+               AMQSession s = sessions.get(sid);
+               if (s != null) {
+                  s.recover(recovered);
+               }
+            }
+         }
+
+         return new DataArrayResponse(recovered.toArray(new TransactionId[0]));
+      }
+
+      @Override
+      public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
+         //we let protocol manager to handle connection add/remove
+         try {
+            protocolManager.removeConnection(state.getInfo(), null);
+         }
+         catch (Throwable e) {
+            // log
+         }
+         return null;
+      }
+
+      @Override
+      public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
+         SessionId sessionId = id.getParentId();
+         SessionState ss = state.getSessionState(sessionId);
+         if (ss == null) {
+            throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " + sessionId);
+         }
+         ConsumerState consumerState = ss.removeConsumer(id);
+         if (consumerState == null) {
+            throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
+         }
+         ConsumerInfo info = consumerState.getInfo();
+         info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
+
+         AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(id);
+
+         consumerBrokerExchange.removeConsumer();
+
+         removeConsumerBrokerExchange(id);
+
+         return null;
+      }
 
-   public AMQConnectionContext getConext() {
-      return this.context;
    }
 
 }


Mime
View raw message