activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [56/58] [abbrv] activemq-artemis git commit: Progress on refactoring - first pass: using AbstractConnection on OpenWireConnection and adding more TODOs
Date Wed, 09 Mar 2016 19:43:19 GMT
Progress on refactoring - first pass: using AbstractConnection on OpenWireConnection and adding more TODOs


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9c2e2259
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9c2e2259
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9c2e2259

Branch: refs/heads/refactor-openwire
Commit: 9c2e2259e7d621e33c26ef7fb397d7963ac7683b
Parents: 2630dde
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Tue Feb 23 21:52:21 2016 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Mar 9 14:41:41 2016 -0500

----------------------------------------------------------------------
 .../protocol/AbstractRemotingConnection.java    |    5 +
 .../protocol/openwire/AMQTransactionImpl.java   |   14 +-
 .../protocol/openwire/OpenWireConnection.java   | 1100 +++++++-----------
 .../openwire/OpenWireProtocolManager.java       |   14 +-
 .../openwire/amq/AMQProducerBrokerExchange.java |   51 +-
 .../openwire/amq/AMQServerSessionFactory.java   |    9 +
 .../core/protocol/openwire/amq/AMQSession.java  |   13 +-
 .../openwire/impl/OpenWireServerCallback.java   |   75 ++
 8 files changed, 560 insertions(+), 721 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c2e2259/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
index b759ccc..ee2449b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
@@ -104,6 +104,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
       return transportConnection.getID();
    }
 
+
+   public String getLocalAddress() {
+      return transportConnection.getLocalAddress();
+   }
+
    @Override
    public String getRemoteAddress() {
       return transportConnection.getRemoteAddress();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c2e2259/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java
index e356522..bbd7e95 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java
@@ -28,22 +28,10 @@ public class AMQTransactionImpl extends TransactionImpl {
 
    private boolean rollbackForClose = false;
 
-   public AMQTransactionImpl(StorageManager storageManager, int timeoutSeconds) {
-      super(storageManager, timeoutSeconds);
-   }
-
-   public AMQTransactionImpl(StorageManager storageManager) {
-      super(storageManager);
-   }
-
    public AMQTransactionImpl(Xid xid, StorageManager storageManager, int timeoutSeconds) {
       super(xid, storageManager, timeoutSeconds);
    }
 
-   public AMQTransactionImpl(long id, Xid xid, StorageManager storageManager) {
-      super(id, xid, storageManager);
-   }
-
    @Override
    public RefsOperation createRefsOperation(Queue queue) {
       return new AMQrefsOperation(queue, storageManager);
@@ -55,6 +43,8 @@ public class AMQTransactionImpl extends TransactionImpl {
          super(queue, storageManager);
       }
 
+
+      // This is because the Rollbacks happen through the consumer, not through the server's
       @Override
       public void afterRollback(Transaction tx) {
          if (rollbackForClose) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c2e2259/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index a6f0f34..dbbb59f 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -39,6 +40,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
+import org.apache.activemq.artemis.core.protocol.openwire.SendingResult;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
@@ -50,6 +54,7 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
@@ -101,32 +106,22 @@ import org.apache.activemq.wireformat.WireFormat;
  * Represents an activemq connection.
  * ToDo: extends AbstractRemotingConnection
  */
-public class OpenWireConnection implements RemotingConnection, CommandVisitor, SecurityAuth {
+public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth {
 
    private final OpenWireProtocolManager protocolManager;
 
-   private final Connection transportConnection;
-
-   private final long creationTime;
-
-   private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<>();
-
    private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<>();
 
    private boolean destroyed = false;
 
    private final Object sendLock = new Object();
 
-   private volatile boolean dataReceived;
-
    private final Acceptor acceptorUsed;
 
    private final OpenWireFormat wireFormat;
 
    private AMQConnectionContext context;
 
-   private boolean pendingStop;
-
    private Throwable stopError = null;
 
    private final AtomicBoolean stopping = new AtomicBoolean(false);
@@ -154,21 +149,16 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    public OpenWireConnection(Acceptor acceptorUsed,
                              Connection connection,
+                             Executor executor,
                              OpenWireProtocolManager openWireProtocolManager,
                              OpenWireFormat wf) {
+      super(connection, executor);
       this.protocolManager = openWireProtocolManager;
-      this.transportConnection = connection;
       this.acceptorUsed = acceptorUsed;
       this.wireFormat = wf;
-      this.creationTime = System.currentTimeMillis();
       this.defaultSocketURIString = connection.getLocalAddress();
    }
 
-   @Override
-   public boolean isWritable(ReadyListener callback) {
-      return transportConnection.isWritable(callback);
-   }
-
    // SecurityAuth implementation
    @Override
    public String getUsername() {
@@ -181,6 +171,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    // SecurityAuth implementation
    @Override
+   public RemotingConnection getRemotingConnection() {
+      return this;
+   }
+
+   // SecurityAuth implementation
+   @Override
    public String getPassword() {
       ConnectionInfo info = getConnectionInfo();
       if (info == null) {
@@ -200,18 +196,15 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       return info;
    }
 
-   public String getLocalAddress() {
-      return transportConnection.getLocalAddress();
-   }
-
    @Override
    public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
+      super.bufferReceived(connectionID, buffer);
       try {
-         dataReceived = true;
+
+         // TODO-NOW: set OperationContext
 
          Command command = (Command) wireFormat.unmarshal(buffer);
 
-         //logger.log("got command: " + command);
          boolean responseRequired = command.isResponseRequired();
          int commandId = command.getCommandId();
          // the connection handles pings, negotiations directly.
@@ -223,36 +216,27 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
             // for some reason KeepAliveInfo.isResponseRequired() is always false
             protocolManager.sendReply(this, info);
          }
-         else if (command.getClass() == WireFormatInfo.class) {
-            // amq here starts a read/write monitor thread (detect ttl?)
-            negotiate((WireFormatInfo) command);
-         }
          else {
             Response response = null;
 
-            if (pendingStop) {
-               response = new ExceptionResponse(this.stopError);
+            try {
+               setLastCommand(command);
+               response = command.visit(new CommandProcessor());
             }
-            else {
-               try {
-                  setLastCommand(command);
-                  response = command.visit(this);
-               }
-               catch (Exception e) {
-                  if (responseRequired) {
-                     response = new ExceptionResponse(e);
-                  }
-               }
-               finally {
-                  setLastCommand(null);
+            catch (Exception e) {
+               if (responseRequired) {
+                  response = new ExceptionResponse(e);
                }
+            }
+            finally {
+               setLastCommand(null);
+            }
 
-               if (response instanceof ExceptionResponse) {
-                  if (!responseRequired) {
-                     Throwable cause = ((ExceptionResponse) response).getException();
-                     serviceException(cause);
-                     response = null;
-                  }
+            if (response instanceof ExceptionResponse) {
+               if (!responseRequired) {
+                  Throwable cause = ((ExceptionResponse) response).getException();
+                  serviceException(cause);
+                  response = null;
                }
             }
 
@@ -272,6 +256,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
                }
             }
 
+            // TODO-NOW: response through operation-context
+
             if (response != null && !protocolManager.isStopping()) {
                response.setCorrelationId(commandId);
                dispatchSync(response);
@@ -280,6 +266,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
          }
       }
       catch (IOException e) {
+
+         // TODO-NOW: send errors
          ActiveMQServerLogger.LOGGER.error("error decoding", e);
       }
       catch (Throwable t) {
@@ -293,135 +281,11 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       }
    }
 
-   private void negotiate(WireFormatInfo command) throws IOException {
-      this.wireFormat.renegotiateWireFormat(command);
-      //throw back a brokerInfo here
-      protocolManager.sendBrokerInfo(this);
-   }
-
-   @Override
-   public Object getID() {
-      return transportConnection.getID();
-   }
-
-   @Override
-   public long getCreationTime() {
-      return creationTime;
-   }
-
-   @Override
-   public String getRemoteAddress() {
-      return transportConnection.getRemoteAddress();
-   }
-
-   @Override
-   public void addFailureListener(FailureListener listener) {
-      if (listener == null) {
-         throw new IllegalStateException("FailureListener cannot be null");
-      }
-
-      failureListeners.add(listener);
-   }
-
-   @Override
-   public boolean removeFailureListener(FailureListener listener) {
-      if (listener == null) {
-         throw new IllegalStateException("FailureListener cannot be null");
-      }
-
-      return failureListeners.remove(listener);
-   }
-
-   @Override
-   public void addCloseListener(CloseListener listener) {
-      if (listener == null) {
-         throw new IllegalStateException("CloseListener cannot be null");
-      }
-
-      closeListeners.add(listener);
-   }
-
-   @Override
-   public boolean removeCloseListener(CloseListener listener) {
-      if (listener == null) {
-         throw new IllegalStateException("CloseListener cannot be null");
-      }
-
-      return closeListeners.remove(listener);
-   }
-
-   @Override
-   public List<CloseListener> removeCloseListeners() {
-      List<CloseListener> ret = new ArrayList<>(closeListeners);
-
-      closeListeners.clear();
-
-      return ret;
-   }
-
-   @Override
-   public void setCloseListeners(List<CloseListener> listeners) {
-      closeListeners.clear();
-
-      closeListeners.addAll(listeners);
-   }
-
-   @Override
-   public List<FailureListener> getFailureListeners() {
-      // we do not return the listeners otherwise the remoting service
-      // would NOT destroy the connection.
-      return Collections.emptyList();
-   }
-
-   @Override
-   public List<FailureListener> removeFailureListeners() {
-      List<FailureListener> ret = new ArrayList<>(failureListeners);
-
-      failureListeners.clear();
-
-      return ret;
-   }
-
-   @Override
-   public void setFailureListeners(List<FailureListener> listeners) {
-      failureListeners.clear();
-
-      failureListeners.addAll(listeners);
-   }
-
-   @Override
-   public ActiveMQBuffer createTransportBuffer(int size) {
-      return ActiveMQBuffers.dynamicBuffer(size);
-   }
-
-   @Override
-   public void fail(ActiveMQException me) {
-      fail(me, null);
-   }
-
    @Override
    public void destroy() {
       fail(null, null);
    }
 
-   private void deleteTempQueues() throws Exception {
-      Iterator<ActiveMQDestination> tmpQs = tempQueues.iterator();
-      while (tmpQs.hasNext()) {
-         ActiveMQDestination q = tmpQs.next();
-         protocolManager.removeDestination(this, q);
-      }
-   }
-
-   @Override
-   public RemotingConnection getRemotingConnection() {
-      return this;
-   }
-
-   @Override
-   public Connection getTransportConnection() {
-      return this.transportConnection;
-   }
-
    @Override
    public boolean isClient() {
       return false;
@@ -466,22 +330,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       }
    }
 
-   private void callClosingListeners() {
-      final List<CloseListener> listenersClone = new ArrayList<>(closeListeners);
-
-      for (final CloseListener listener : listenersClone) {
-         try {
-            listener.connectionClosed();
-         }
-         catch (final Throwable t) {
-            // Failure of one listener to execute shouldn't prevent others
-            // from
-            // executing
-            ActiveMQServerLogger.LOGGER.errorCallingFailureListener(t);
-         }
-      }
-   }
-
    // throw a WireFormatInfo to the peer
    public void init() {
       WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
@@ -509,34 +357,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    }
 
-   @Override
-   public Response processAddConnection(ConnectionInfo info) throws Exception {
-      //let protoclmanager handle connection add/remove
-      try {
-         protocolManager.addConnection(this, info);
-      }
-      catch (Exception e) {
-         if (e instanceof SecurityException) {
-            // close this down - in case the peer of this transport doesn't play
-            // nice
-            delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
-         }
-         Response resp = new ExceptionResponse(e);
-         return resp;
-      }
-      if (info.isManageable() && protocolManager.isUpdateClusterClients()) {
-         // send ConnectionCommand
-         ConnectionControl command = protocolManager.newConnectionControl();
-         command.setFaultTolerant(protocolManager.isFaultTolerantConfiguration());
-         if (info.isFailoverReconnect()) {
-            command.setRebalanceConnection(false);
-         }
-         dispatchAsync(command);
-      }
-      return null;
-
-   }
-
    public void dispatchAsync(Command message) {
       if (!stopping.get()) {
          dispatchSync(message);
@@ -564,7 +384,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    public void serviceExceptionAsync(final IOException e) {
       if (asyncException.compareAndSet(false, true)) {
-         // Why this is not through an executor?
+         // TODO: Why this is not through an executor?
          new Thread("Async Exception Handler") {
             @Override
             public void run() {
@@ -585,12 +405,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
          try {
             ConnectionError ce = new ConnectionError();
             ce.setException(e);
-            if (pendingStop) {
-               dispatchSync(ce);
-            }
-            else {
-               dispatchAsync(ce);
-            }
+            dispatchAsync(ce);
          }
          finally {
             inServiceException = false;
@@ -649,89 +464,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       }
    }
 
-   public void delayedStop(final int waitTime, final String reason, Throwable cause) {
-      if (waitTime > 0) {
-         synchronized (this) {
-            pendingStop = true;
-            stopError = cause;
-         }
-      }
-   }
-
-   public void stopAsync() {
-      // If we're in the middle of starting then go no further... for now.
-      synchronized (this) {
-         pendingStop = true;
-      }
-      if (stopping.compareAndSet(false, true)) {
-         if (context != null) {
-            context.getStopping().set(true);
-         }
-      }
-   }
-
-   protected void doStop() throws Exception {
-      /*
-       * What's a duplex bridge? try { synchronized (this) { if (duplexBridge !=
-       * null) { duplexBridge.stop(); } } } catch (Exception ignore) {
-       * LOG.trace("Exception caught stopping. This exception is ignored.",
-       * ignore); }
-       */
-      try {
-         getTransportConnection().close();
-      }
-      catch (Exception e) {
-         // log
-      }
-
-      // Run the MessageDispatch callbacks so that message references get
-      // cleaned up.
-      synchronized (dispatchQueue) {
-         for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
-            Command command = iter.next();
-            if (command.isMessageDispatch()) {
-               MessageDispatch md = (MessageDispatch) command;
-               TransmitCallback sub = md.getTransmitCallback();
-               protocolManager.postProcessDispatch(md);
-               if (sub != null) {
-                  sub.onFailure();
-               }
-            }
-         }
-         dispatchQueue.clear();
-      }
-      //
-      // Remove all logical connection associated with this connection
-      // from the broker.
-      if (!protocolManager.isStopped()) {
-         context.getStopping().set(true);
-         try {
-            processRemoveConnection(state.getInfo().getConnectionId(), 0L);
-         }
-         catch (Throwable ignore) {
-            ignore.printStackTrace();
-         }
-      }
-   }
-
-   @Override
-   public Response processAddConsumer(ConsumerInfo info) {
-      Response resp = null;
-      try {
-         protocolManager.addConsumer(this, info);
-      }
-      catch (Exception e) {
-         e.printStackTrace();
-         if (e instanceof ActiveMQSecurityException) {
-            resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
-         }
-         else {
-            resp = new ExceptionResponse(e);
-         }
-      }
-      return resp;
-   }
-
    public void addConsumerBrokerExchange(ConsumerId id,
                                          AMQSession amqSession,
                                          Map<ActiveMQDestination, AMQConsumer> consumerMap) {
@@ -762,222 +494,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       }
    }
 
-   public int getConsumerCount() {
-      int result = 0;
-      for (SessionId sessionId : state.getSessionIds()) {
-         SessionState sessionState = state.getSessionState(sessionId);
-         if (sessionState != null) {
-            result += sessionState.getConsumerIds().size();
-         }
-      }
-      return result;
-   }
-
-   public int getProducerCount() {
-      int result = 0;
-      for (SessionId sessionId : state.getSessionIds()) {
-         SessionState sessionState = state.getSessionState(sessionId);
-         if (sessionState != null) {
-            result += sessionState.getProducerIds().size();
-         }
-      }
-      return result;
-   }
-
-   @Override
-   public Response processAddDestination(DestinationInfo dest) throws Exception {
-      Response resp = null;
-      try {
-         protocolManager.addDestination(this, dest);
-      }
-      catch (Exception e) {
-         if (e instanceof ActiveMQSecurityException) {
-            resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
-         }
-         else {
-            resp = new ExceptionResponse(e);
-         }
-      }
-      return resp;
-   }
-
-   @Override
-   public Response processAddProducer(ProducerInfo info) throws Exception {
-      Response resp = null;
-      try {
-         protocolManager.addProducer(this, info);
-      }
-      catch (Exception e) {
-         if (e instanceof ActiveMQSecurityException) {
-            resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
-         }
-         else if (e instanceof ActiveMQNonExistentQueueException) {
-            resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage()));
-         }
-         else {
-            resp = new ExceptionResponse(e);
-         }
-      }
-      return resp;
-   }
-
-   @Override
-   public Response processAddSession(SessionInfo info) throws Exception {
-      // Avoid replaying dup commands
-      if (!state.getSessionIds().contains(info.getSessionId())) {
-         protocolManager.addSession(this, info);
-         try {
-            state.addSession(info);
-         }
-         catch (IllegalStateException e) {
-            e.printStackTrace();
-            protocolManager.removeSession(context, info);
-         }
-      }
-      return null;
-   }
-
-   @Override
-   public Response processBeginTransaction(TransactionInfo info) throws Exception {
-      TransactionId txId = info.getTransactionId();
-
-      if (!txMap.containsKey(txId)) {
-         txMap.put(txId, info);
-      }
-      return null;
-   }
-
-   @Override
-   public Response processBrokerInfo(BrokerInfo arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
-   }
-
-   @Override
-   public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
-      protocolManager.commitTransactionOnePhase(info);
-      TransactionId txId = info.getTransactionId();
-      txMap.remove(txId);
-
-      return null;
-   }
-
-   @Override
-   public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
-      protocolManager.commitTransactionTwoPhase(info);
-      TransactionId txId = info.getTransactionId();
-      txMap.remove(txId);
-
-      return null;
-   }
-
-   @Override
-   public Response processConnectionControl(ConnectionControl connectionControl) throws Exception {
-      //activemq5 keeps a var to remember only the faultTolerant flag
-      //this can be sent over a reconnected transport as the first command
-      //before restoring the connection.
-      return null;
-   }
-
-   @Override
-   public Response processConnectionError(ConnectionError arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
-   }
-
-   @Override
-   public Response processConsumerControl(ConsumerControl consumerControl) throws Exception {
-      //amq5 clients send this command to restore prefetchSize
-      //after successful reconnect
-      try {
-         protocolManager.updateConsumer(this, consumerControl);
-      }
-      catch (Exception e) {
-         //log error
-      }
-      return null;
-   }
-
-   @Override
-   public Response processControlCommand(ControlCommand arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
-   }
-
-   @Override
-   public Response processEndTransaction(TransactionInfo info) throws Exception {
-      protocolManager.endTransaction(info);
-      TransactionId txId = info.getTransactionId();
-
-      if (!txMap.containsKey(txId)) {
-         txMap.put(txId, info);
-      }
-      return null;
-   }
-
-   @Override
-   public Response processFlush(FlushCommand arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
-   }
-
-   @Override
-   public Response processForgetTransaction(TransactionInfo info) throws Exception {
-      TransactionId txId = info.getTransactionId();
-      txMap.remove(txId);
-
-      protocolManager.forgetTransaction(info.getTransactionId());
-      return null;
-   }
-
-   @Override
-   public Response processKeepAlive(KeepAliveInfo arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
-   }
-
-   @Override
-   public Response processMessage(Message messageSend) {
-      Response resp = null;
-      try {
-         ProducerId producerId = messageSend.getProducerId();
-         AMQProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
-         final AMQConnectionContext pcontext = producerExchange.getConnectionContext();
-         final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
-         boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0 && !pcontext.isInRecoveryMode();
-
-         AMQSession session = protocolManager.getSession(producerId.getParentId());
-
-         if (producerExchange.canDispatch(messageSend)) {
-            SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
-            if (result.isBlockNextSend()) {
-               if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) {
-                  throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
-               }
-
-               if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) {
-                  //in that case don't send the response
-                  //this will force the client to wait until
-                  //the response is got.
-                  context.setDontSendReponse(true);
-               }
-               else {
-                  //hang the connection until the space is available
-                  session.blockingWaitForSpace(producerExchange, result);
-               }
-            }
-            else if (sendProducerAck) {
-               ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
-               this.dispatchAsync(ack);
-            }
-         }
-      }
-      catch (Throwable e) {
-         if (e instanceof ActiveMQSecurityException) {
-            resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
-         }
-         else {
-            resp = new ExceptionResponse(e);
-         }
-      }
-      return resp;
-   }
-
    private AMQProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
       AMQProducerBrokerExchange result = producerExchanges.get(id);
       if (result == null) {
@@ -1007,163 +523,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       return result;
    }
 
-   @Override
-   public Response processMessageAck(MessageAck ack) throws Exception {
-      AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId());
-      consumerBrokerExchange.acknowledge(ack);
-      return null;
-   }
-
-   @Override
-   public Response processMessageDispatch(MessageDispatch arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
-   }
-
-   @Override
-   public Response processMessageDispatchNotification(MessageDispatchNotification arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
-   }
-
-   @Override
-   public Response processMessagePull(MessagePull arg0) throws Exception {
-      AMQConsumerBrokerExchange amqConsumerBrokerExchange = consumerExchanges.get(arg0.getConsumerId());
-      if (amqConsumerBrokerExchange == null) {
-         throw new IllegalStateException("Consumer does not exist");
-      }
-      amqConsumerBrokerExchange.processMessagePull(arg0);
-      return null;
-   }
-
-   @Override
-   public Response processPrepareTransaction(TransactionInfo info) throws Exception {
-      protocolManager.prepareTransaction(info);
-      return null;
-   }
-
-   @Override
-   public Response processProducerAck(ProducerAck arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
-   }
-
-   @Override
-   public Response processRecoverTransactions(TransactionInfo info) throws Exception {
-      Set<SessionId> sIds = state.getSessionIds();
-      TransactionId[] recovered = protocolManager.recoverTransactions(sIds);
-      return new DataArrayResponse(recovered);
-   }
-
-   @Override
-   public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
-      //we let protocol manager to handle connection add/remove
-      try {
-         protocolManager.removeConnection(this, state.getInfo(), null);
-      }
-      catch (Throwable e) {
-         // log
-      }
-      return null;
-   }
-
-   @Override
-   public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
-      SessionId sessionId = id.getParentId();
-      SessionState ss = state.getSessionState(sessionId);
-      if (ss == null) {
-         throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " + sessionId);
-      }
-      ConsumerState consumerState = ss.removeConsumer(id);
-      if (consumerState == null) {
-         throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
-      }
-      ConsumerInfo info = consumerState.getInfo();
-      info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
-
-      AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(id);
-
-      consumerBrokerExchange.removeConsumer();
-
-      removeConsumerBrokerExchange(id);
-
-      return null;
-   }
-
    private void removeConsumerBrokerExchange(ConsumerId id) {
       synchronized (consumerExchanges) {
          consumerExchanges.remove(id);
       }
    }
 
-   @Override
-   public Response processRemoveDestination(DestinationInfo info) throws Exception {
-      ActiveMQDestination dest = info.getDestination();
-      protocolManager.removeDestination(this, dest);
-      return null;
-   }
-
-   @Override
-   public Response processRemoveProducer(ProducerId id) throws Exception {
-      protocolManager.removeProducer(id);
-      return null;
-   }
-
-   @Override
-   public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
-      SessionState session = state.getSessionState(id);
-      if (session == null) {
-         throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
-      }
-      // Don't let new consumers or producers get added while we are closing
-      // this down.
-      session.shutdown();
-      // Cascade the connection stop producers.
-      // we don't stop consumer because in core
-      // closing the session will do the job
-      for (ProducerId producerId : session.getProducerIds()) {
-         try {
-            processRemoveProducer(producerId);
-         }
-         catch (Throwable e) {
-            // LOG.warn("Failed to remove producer: {}", producerId, e);
-         }
-      }
-      state.removeSession(id);
-      protocolManager.removeSession(context, session.getInfo());
-      return null;
-   }
-
-   @Override
-   public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
-      protocolManager.removeSubscription(subInfo);
-      return null;
-   }
-
-   @Override
-   public Response processRollbackTransaction(TransactionInfo info) throws Exception {
-      protocolManager.rollbackTransaction(info);
-      TransactionId txId = info.getTransactionId();
-      txMap.remove(txId);
-      return null;
-   }
-
-   @Override
-   public Response processShutdown(ShutdownInfo info) throws Exception {
-      this.shutdown(false);
-      return null;
-   }
-
-   @Override
-   public Response processWireFormat(WireFormatInfo arg0) throws Exception {
-      throw new IllegalStateException("not implemented! ");
-   }
-
-   public int getMaximumConsumersAllowedPerConnection() {
-      return 1000000;//this belongs to configuration, now hardcoded
-   }
-
-   public int getMaximumProducersAllowedPerConnection() {
-      return 1000000;//this belongs to configuration, now hardcoded
-   }
-
    public void deliverMessage(MessageDispatch dispatch) {
       Message m = dispatch.getMessage();
       if (m != null) {
@@ -1323,4 +688,393 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       context.setReconnect(true);
       context.incRefCount();
    }
+
+   // This will listen for commands throught the protocolmanager
+   class CommandProcessor implements CommandVisitor {
+
+      @Override
+      public Response processAddConnection(ConnectionInfo info) throws Exception {
+         //let protoclmanager handle connection add/remove
+         try {
+            protocolManager.addConnection(OpenWireConnection.this, info);
+         }
+         catch (Exception e) {
+            Response resp = new ExceptionResponse(e);
+            return resp;
+         }
+         if (info.isManageable() && protocolManager.isUpdateClusterClients()) {
+            // send ConnectionCommand
+            ConnectionControl command = protocolManager.newConnectionControl();
+            command.setFaultTolerant(protocolManager.isFaultTolerantConfiguration());
+            if (info.isFailoverReconnect()) {
+               command.setRebalanceConnection(false);
+            }
+            dispatchAsync(command);
+         }
+         return null;
+
+      }
+
+      @Override
+      public Response processAddProducer(ProducerInfo info) throws Exception {
+         Response resp = null;
+         try {
+            protocolManager.addProducer(OpenWireConnection.this, info);
+         }
+         catch (Exception e) {
+            if (e instanceof ActiveMQSecurityException) {
+               resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+            }
+            else if (e instanceof ActiveMQNonExistentQueueException) {
+               resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage()));
+            }
+            else {
+               resp = new ExceptionResponse(e);
+            }
+         }
+         return resp;
+      }
+
+      @Override
+      public Response processAddConsumer(ConsumerInfo info) {
+         Response resp = null;
+         try {
+            protocolManager.addConsumer(OpenWireConnection.this, info);
+         }
+         catch (Exception e) {
+            e.printStackTrace();
+            if (e instanceof ActiveMQSecurityException) {
+               resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+            }
+            else {
+               resp = new ExceptionResponse(e);
+            }
+         }
+         return resp;
+      }
+
+      @Override
+      public Response processRemoveDestination(DestinationInfo info) throws Exception {
+         ActiveMQDestination dest = info.getDestination();
+         protocolManager.removeDestination(OpenWireConnection.this, dest);
+         return null;
+      }
+
+      @Override
+      public Response processRemoveProducer(ProducerId id) throws Exception {
+         protocolManager.removeProducer(id);
+         return null;
+      }
+
+      @Override
+      public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
+         SessionState session = state.getSessionState(id);
+         if (session == null) {
+            throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
+         }
+         // Don't let new consumers or producers get added while we are closing
+         // this down.
+         session.shutdown();
+         // Cascade the connection stop producers.
+         // we don't stop consumer because in core
+         // closing the session will do the job
+         for (ProducerId producerId : session.getProducerIds()) {
+            try {
+               processRemoveProducer(producerId);
+            }
+            catch (Throwable e) {
+               // LOG.warn("Failed to remove producer: {}", producerId, e);
+            }
+         }
+         state.removeSession(id);
+         protocolManager.removeSession(context, session.getInfo());
+         return null;
+      }
+
+      @Override
+      public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
+         protocolManager.removeSubscription(subInfo);
+         return null;
+      }
+
+      @Override
+      public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+         protocolManager.rollbackTransaction(info);
+         TransactionId txId = info.getTransactionId();
+         txMap.remove(txId);
+         return null;
+      }
+
+      @Override
+      public Response processShutdown(ShutdownInfo info) throws Exception {
+         OpenWireConnection.this.shutdown(false);
+         return null;
+      }
+
+      @Override
+      public Response processWireFormat(WireFormatInfo command) throws Exception {
+         wireFormat.renegotiateWireFormat(command);
+         //throw back a brokerInfo here
+         protocolManager.sendBrokerInfo(OpenWireConnection.this);
+         return null;
+      }
+
+      @Override
+      public Response processAddDestination(DestinationInfo dest) throws Exception {
+         Response resp = null;
+         try {
+            protocolManager.addDestination(OpenWireConnection.this, dest);
+         }
+         catch (Exception e) {
+            if (e instanceof ActiveMQSecurityException) {
+               resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+            }
+            else {
+               resp = new ExceptionResponse(e);
+            }
+         }
+         return resp;
+      }
+
+      @Override
+      public Response processAddSession(SessionInfo info) throws Exception {
+         // Avoid replaying dup commands
+         if (!state.getSessionIds().contains(info.getSessionId())) {
+            protocolManager.addSession(OpenWireConnection.this, info);
+            try {
+               state.addSession(info);
+            }
+            catch (IllegalStateException e) {
+               e.printStackTrace();
+               protocolManager.removeSession(context, info);
+            }
+         }
+         return null;
+      }
+
+      @Override
+      public Response processBeginTransaction(TransactionInfo info) throws Exception {
+         TransactionId txId = info.getTransactionId();
+
+         if (!txMap.containsKey(txId)) {
+            txMap.put(txId, info);
+         }
+         return null;
+      }
+
+      @Override
+      public Response processBrokerInfo(BrokerInfo arg0) throws Exception {
+         throw new IllegalStateException("not implemented! ");
+      }
+
+      @Override
+      public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+         protocolManager.commitTransactionOnePhase(info);
+         TransactionId txId = info.getTransactionId();
+         txMap.remove(txId);
+
+         return null;
+      }
+
+      @Override
+      public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+         protocolManager.commitTransactionTwoPhase(info);
+         TransactionId txId = info.getTransactionId();
+         txMap.remove(txId);
+
+         return null;
+      }
+
+      @Override
+      public Response processConnectionControl(ConnectionControl connectionControl) throws Exception {
+         //activemq5 keeps a var to remember only the faultTolerant flag
+         //this can be sent over a reconnected transport as the first command
+         //before restoring the connection.
+         return null;
+      }
+
+      @Override
+      public Response processConnectionError(ConnectionError arg0) throws Exception {
+         throw new IllegalStateException("not implemented! ");
+      }
+
+      @Override
+      public Response processConsumerControl(ConsumerControl consumerControl) throws Exception {
+         //amq5 clients send this command to restore prefetchSize
+         //after successful reconnect
+         try {
+            protocolManager.updateConsumer(OpenWireConnection.this, consumerControl);
+         }
+         catch (Exception e) {
+            //log error
+         }
+         return null;
+      }
+
+      @Override
+      public Response processControlCommand(ControlCommand arg0) throws Exception {
+         throw new IllegalStateException("not implemented! ");
+      }
+
+      @Override
+      public Response processEndTransaction(TransactionInfo info) throws Exception {
+         protocolManager.endTransaction(info);
+         TransactionId txId = info.getTransactionId();
+
+         if (!txMap.containsKey(txId)) {
+            txMap.put(txId, info);
+         }
+         return null;
+      }
+
+      @Override
+      public Response processFlush(FlushCommand arg0) throws Exception {
+         throw new IllegalStateException("not implemented! ");
+      }
+
+      @Override
+      public Response processForgetTransaction(TransactionInfo info) throws Exception {
+         TransactionId txId = info.getTransactionId();
+         txMap.remove(txId);
+
+         protocolManager.forgetTransaction(info.getTransactionId());
+         return null;
+      }
+
+      @Override
+      public Response processKeepAlive(KeepAliveInfo arg0) throws Exception {
+         throw new IllegalStateException("not implemented! ");
+      }
+
+      @Override
+      public Response processMessage(Message messageSend) {
+         Response resp = null;
+         try {
+            ProducerId producerId = messageSend.getProducerId();
+            AMQProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
+            final AMQConnectionContext pcontext = producerExchange.getConnectionContext();
+            final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
+            boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0 && !pcontext.isInRecoveryMode();
+
+            AMQSession session = protocolManager.getSession(producerId.getParentId());
+
+            // TODO: canDispatch is always returning true;
+            if (producerExchange.canDispatch(messageSend)) {
+               SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
+               if (result.isBlockNextSend()) {
+                  if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) {
+                     // TODO see logging
+                     throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
+                  }
+
+                  if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) {
+                     //in that case don't send the response
+                     //this will force the client to wait until
+                     //the response is got.
+                     context.setDontSendReponse(true);
+                  }
+                  else {
+                     //hang the connection until the space is available
+                     session.blockingWaitForSpace(producerExchange, result);
+                  }
+               }
+               else if (sendProducerAck) {
+                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+                  OpenWireConnection.this.dispatchAsync(ack);
+               }
+            }
+         }
+         catch (Throwable e) {
+            if (e instanceof ActiveMQSecurityException) {
+               resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+            }
+            else {
+               resp = new ExceptionResponse(e);
+            }
+         }
+         return resp;
+      }
+
+      @Override
+      public Response processMessageAck(MessageAck ack) throws Exception {
+         AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId());
+         consumerBrokerExchange.acknowledge(ack);
+         return null;
+      }
+
+      @Override
+      public Response processMessageDispatch(MessageDispatch arg0) throws Exception {
+         throw new IllegalStateException("not implemented! ");
+      }
+
+      @Override
+      public Response processMessageDispatchNotification(MessageDispatchNotification arg0) throws Exception {
+         throw new IllegalStateException("not implemented! ");
+      }
+
+      @Override
+      public Response processMessagePull(MessagePull arg0) throws Exception {
+         AMQConsumerBrokerExchange amqConsumerBrokerExchange = consumerExchanges.get(arg0.getConsumerId());
+         if (amqConsumerBrokerExchange == null) {
+            throw new IllegalStateException("Consumer does not exist");
+         }
+         amqConsumerBrokerExchange.processMessagePull(arg0);
+         return null;
+      }
+
+      @Override
+      public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+         protocolManager.prepareTransaction(info);
+         return null;
+      }
+
+      @Override
+      public Response processProducerAck(ProducerAck arg0) throws Exception {
+         throw new IllegalStateException("not implemented! ");
+      }
+
+      @Override
+      public Response processRecoverTransactions(TransactionInfo info) throws Exception {
+         Set<SessionId> sIds = state.getSessionIds();
+         TransactionId[] recovered = protocolManager.recoverTransactions(sIds);
+         return new DataArrayResponse(recovered);
+      }
+
+      @Override
+      public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
+         //we let protocol manager to handle connection add/remove
+         try {
+            protocolManager.removeConnection(OpenWireConnection.this, state.getInfo(), null);
+         }
+         catch (Throwable e) {
+            // log
+         }
+         return null;
+      }
+
+      @Override
+      public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
+         SessionId sessionId = id.getParentId();
+         SessionState ss = state.getSessionState(sessionId);
+         if (ss == null) {
+            throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " + sessionId);
+         }
+         ConsumerState consumerState = ss.removeConsumer(id);
+         if (consumerState == null) {
+            throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
+         }
+         ConsumerInfo info = consumerState.getInfo();
+         info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
+
+         AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(id);
+
+         consumerBrokerExchange.removeConsumer();
+
+         removeConsumerBrokerExchange(id);
+
+         return null;
+      }
+
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c2e2259/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 014181d..440fcce 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -219,7 +219,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
       OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
-      OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, this, wf);
+      OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection,  server.getExecutorFactory().getExecutor(), this, wf);
       owConn.init();
 
       // TODO CLEBERT What is this constant here? we should get it from TTL initial pings
@@ -506,9 +506,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
          ActiveMQDestination destination = info.getDestination();
          if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
-            if (theConn.getProducerCount() >= theConn.getMaximumProducersAllowedPerConnection()) {
-               throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumProducersAllowedPerConnection());
-            }
             if (destination.isQueue()) {
                OpenWireUtil.validateDestination(destination, amqSession);
             }
@@ -549,12 +546,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       }
       // Avoid replaying dup commands
       if (!ss.getConsumerIds().contains(info.getConsumerId())) {
-         ActiveMQDestination destination = info.getDestination();
-         if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
-            if (theConn.getConsumerCount() >= theConn.getMaximumConsumersAllowedPerConnection()) {
-               throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumConsumersAllowedPerConnection());
-            }
-         }
 
          AMQSession amqSession = sessions.get(sessionId);
          if (amqSession == null) {
@@ -760,6 +751,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       transactions.remove(xid);
    }
 
+   /**
+    * TODO: remove this, use the regular ResourceManager from the Server's
+    * */
    public void registerTx(TransactionId txId, AMQSession amqSession) {
       transactions.put(txId, amqSession);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c2e2259/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
index f94c119..e9c4044 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
@@ -29,8 +29,6 @@ public class AMQProducerBrokerExchange {
    private ProducerState producerState;
    private boolean mutable = true;
    private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
-   private boolean auditProducerSequenceIds;
-   private boolean isNetworkProducer;
    private final FlowControlInfo flowControlInfo = new FlowControlInfo();
 
    public AMQProducerBrokerExchange() {
@@ -92,29 +90,34 @@ public class AMQProducerBrokerExchange {
     * @return false if message should be ignored as a duplicate
     */
    public boolean canDispatch(Message messageSend) {
+      // TODO: auditProduceSequenceIds is never true
       boolean canDispatch = true;
-      if (auditProducerSequenceIds && messageSend.isPersistent()) {
-         final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId();
-         if (isNetworkProducer) {
-            // messages are multiplexed on this producer so we need to query the
-            // persistenceAdapter
-            long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
-            if (producerSequenceId <= lastStoredForMessageProducer) {
-               canDispatch = false;
-            }
-         }
-         else if (producerSequenceId <= lastSendSequenceNumber.get()) {
-            canDispatch = false;
-            if (messageSend.isInTransaction()) {
-            }
-            else {
-            }
-         }
-         else {
-            // track current so we can suppress duplicates later in the stream
-            lastSendSequenceNumber.set(producerSequenceId);
-         }
-      }
+      //TODO: DEAD CODE
+//      if (auditProducerSequenceIds && messageSend.isPersistent()) {
+//         final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId();
+//         if (isNetworkProducer) {
+//            // messages are multiplexed on this producer so we need to query the
+//            // persistenceAdapter
+//            long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
+//            if (producerSequenceId <= lastStoredForMessageProducer) {
+//               canDispatch = false;
+//            }
+//         }
+//         else if (producerSequenceId <= lastSendSequenceNumber.get()) {
+//            canDispatch = false;
+//            // TODO: WHAT IS THIS?
+//            if (messageSend.isInTransaction()) {
+//
+//
+//            }
+//            else {
+//            }
+//         }
+//         else {
+//            // track current so we can suppress duplicates later in the stream
+//            lastSendSequenceNumber.set(producerSequenceId);
+//         }
+//      }
       return canDispatch;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c2e2259/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
index 9ce21e3..a6ca4a0 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
@@ -32,6 +32,15 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 
 public class AMQServerSessionFactory implements ServerSessionFactory {
 
+   private static final AMQServerSessionFactory singleInstance = new AMQServerSessionFactory();
+
+   public static AMQServerSessionFactory getInstance() {
+      return singleInstance;
+   }
+
+   private AMQServerSessionFactory() {
+   }
+
    @Override
    public ServerSessionImpl createCoreSession(String name,
                                               String username,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c2e2259/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index fe8d3c4..83709a1 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -63,8 +63,8 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.wireformat.WireFormat;
 
 public class AMQSession implements SessionCallback {
-   private AMQServerSession coreSession;
    private ConnectionInfo connInfo;
+   private AMQServerSession coreSession;
    private SessionInfo sessInfo;
    private ActiveMQServer server;
    private OpenWireConnection connection;
@@ -91,6 +91,7 @@ public class AMQSession implements SessionCallback {
                      OpenWireProtocolManager manager) {
       this.connInfo = connInfo;
       this.sessInfo = sessInfo;
+
       this.server = server;
       this.connection = connection;
       this.scheduledPool = scheduledPool;
@@ -107,7 +108,7 @@ public class AMQSession implements SessionCallback {
       // now
 
       try {
-         coreSession = (AMQServerSession) server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, new AMQServerSessionFactory(), true);
+         coreSession = (AMQServerSession) server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, AMQServerSessionFactory.getInstance(), true);
 
          long sessionId = sessInfo.getSessionId().getValue();
          if (sessionId == -1) {
@@ -144,6 +145,7 @@ public class AMQSession implements SessionCallback {
       }
       connection.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumerMap);
 
+      // TODO: This is wrong. We should only start when the client starts
       coreSession.start();
       started.set(true);
    }
@@ -276,6 +278,9 @@ public class AMQSession implements SessionCallback {
          coreMsg.setAddress(address);
 
          PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(address);
+
+
+         // TODO: Improve this, tested with ProducerFlowControlSendFailTest
          if (store.isFull()) {
             result.setBlockNextSend(true);
             result.setBlockPagingStore(store);
@@ -521,8 +526,12 @@ public class AMQSession implements SessionCallback {
       }
    }
 
+   // TODO: remove this, we should do the same as we do on core for blocking
    public void blockingWaitForSpace(AMQProducerBrokerExchange producerExchange,
                                     SendingResult result) throws IOException {
+
+
+      new Exception("blocking").printStackTrace();
       long start = System.currentTimeMillis();
       long nextWarn = start;
       producerExchange.blockingOnFlowControl(true);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c2e2259/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java
new file mode 100644
index 0000000..8ab3815
--- /dev/null
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.openwire.impl;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+
+public class OpenWireServerCallback implements SessionCallback {
+
+   @Override
+   public boolean hasCredits(ServerConsumer consumerID) {
+      return false;
+   }
+
+   @Override
+   public void sendProducerCreditsMessage(int credits, SimpleString address) {
+
+   }
+
+   @Override
+   public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
+
+   }
+
+   @Override
+   public int sendMessage(ServerMessage message, ServerConsumer consumerID, int deliveryCount) {
+      return 0;
+   }
+
+   @Override
+   public int sendLargeMessage(ServerMessage message, ServerConsumer consumerID, long bodySize, int deliveryCount) {
+      return 0;
+   }
+
+   @Override
+   public int sendLargeMessageContinuation(ServerConsumer consumerID,
+                                           byte[] body,
+                                           boolean continues,
+                                           boolean requiresResponse) {
+      return 0;
+   }
+
+   @Override
+   public void closed() {
+
+   }
+
+   @Override
+   public void disconnect(ServerConsumer consumerId, String queueName) {
+
+   }
+
+   @Override
+   public boolean isWritable(ReadyListener callback) {
+      return false;
+   }
+}


Mime
View raw message