activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-186, ARTEMIS-188 and some refactoring
Date Thu, 06 Aug 2015 14:17:34 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 5c7720dba -> d4de65041


ARTEMIS-186, ARTEMIS-188 and some refactoring

refactoring and simplifying some of the connection state code and removing stuff we dont need. Also removed some of maps used and removed the need for lots of lookups

https://issues.apache.org/jira/browse/ARTEMIS-186

Added message pull support for zero prefetch consumers

https://issues.apache.org/jira/browse/ARTEMIS-188

Added consumer flow control


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/81756739
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/81756739
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/81756739

Branch: refs/heads/master
Commit: 81756739bbca82e4a213f5e9a194999fc6c38b2c
Parents: 5c7720d
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Wed Aug 5 12:30:21 2015 +0100
Committer: Andy Taylor <andy.tayls67@gmail.com>
Committed: Thu Aug 6 09:49:46 2015 +0100

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   | 292 +++++--------------
 .../openwire/OpenWireProtocolManager.java       |  59 ++--
 .../amq/AMQCompositeConsumerBrokerExchange.java |  61 ++++
 .../core/protocol/openwire/amq/AMQConsumer.java | 145 +++++++--
 .../openwire/amq/AMQConsumerBrokerExchange.java |  17 +-
 .../AMQMapTransportConnectionStateRegister.java | 151 ----------
 .../core/protocol/openwire/amq/AMQSession.java  |  49 ++--
 .../amq/AMQSingleConsumerBrokerExchange.java    |  46 +++
 ...QSingleTransportConnectionStateRegister.java | 184 ------------
 .../amq/AMQTransportConnectionState.java        |  85 ------
 .../AMQTransportConnectionStateRegister.java    |  59 ----
 .../core/server/impl/ServerSessionImpl.java     |  12 +
 12 files changed, 382 insertions(+), 778 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81756739/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 674055b..e465ba7 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -40,6 +40,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.BrokerInfo;
@@ -77,14 +80,10 @@ import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMapTransportConnectionStateRegister;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMessageAuthorizationPolicy;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleTransportConnectionStateRegister;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransaction;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransportConnectionState;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransportConnectionStateRegister;
 import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@@ -129,8 +128,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
 
    private OpenWireFormat wireFormat;
 
-   private AMQTransportConnectionStateRegister connectionStateRegister = new AMQSingleTransportConnectionStateRegister();
-
    private boolean faultTolerantConnection;
 
    private AMQConnectionContext context;
@@ -175,12 +172,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
    private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, AMQConsumerBrokerExchange>();
    private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, AMQProducerBrokerExchange>();
 
-   private AMQTransportConnectionState state;
+   private ConnectionState state;
 
    private final Set<String> tempQueues = new ConcurrentHashSet<String>();
 
-   protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
-
    private DataInputWrapper dataInput = new DataInputWrapper();
 
    private Map<TransactionId, TransactionInfo> txMap = new ConcurrentHashMap<TransactionId, TransactionInfo>();
@@ -194,7 +189,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
       this.transportConnection = connection;
       this.acceptorUsed = new AMQConnectorImpl(acceptorUsed);
       this.wireFormat = wf;
-      brokerConnectionStates = protocolManager.getConnectionStates();
       this.creationTime = System.currentTimeMillis();
    }
 
@@ -299,7 +293,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
                      context.setDontSendReponse(false);
                      response = null;
                   }
-                  context = null;
                }
 
                if (response != null && !protocolManager.isStopping())
@@ -621,38 +614,16 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
          info.setClientMaster(true);
       }
 
-      // Make sure 2 concurrent connections by the same ID only generate 1
-      // TransportConnectionState object.
-      synchronized (brokerConnectionStates)
-      {
-         state = (AMQTransportConnectionState) brokerConnectionStates.get(info
-               .getConnectionId());
-         if (state == null)
-         {
-            state = new AMQTransportConnectionState(info, this);
-            brokerConnectionStates.put(info.getConnectionId(), state);
-         }
-         state.incrementReference();
-      }
-      // If there are 2 concurrent connections for the same connection id,
-      // then last one in wins, we need to sync here
-      // to figure out the winner.
-      synchronized (state.getConnectionMutex())
-      {
-         if (state.getConnection() != this)
-         {
-            state.getConnection().disconnect(true);
-            state.setConnection(this);
-            state.reset(info);
-         }
-      }
+      state = new ConnectionState(info);
+
+      context = new AMQConnectionContext();
+
+      state.reset(info);
 
-      registerConnectionState(info.getConnectionId(), state);
 
       this.faultTolerantConnection = info.isFaultTolerant();
       // Setup the context.
       String clientId = info.getClientId();
-      context = new AMQConnectionContext();
       context.setBroker(protocolManager);
       context.setClientId(clientId);
       context.setClientMaster(info.isClientMaster());
@@ -671,8 +642,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
       context.setReconnect(info.isFailoverReconnect());
       this.manageable = info.isManageable();
       context.setConnectionState(state);
-      state.setContext(context);
-      state.setConnection(this);
       if (info.getClientIp() == null)
       {
          info.setClientIp(getRemoteAddress());
@@ -684,12 +653,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
       }
       catch (Exception e)
       {
-         synchronized (brokerConnectionStates)
-         {
-            brokerConnectionStates.remove(info.getConnectionId());
-         }
-         unregisterConnectionState(info.getConnectionId());
-
          if (e instanceof SecurityException)
          {
             // close this down - in case the peer of this transport doesn't play
@@ -926,28 +889,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
       return this.messageAuthorizationPolicy;
    }
 
-   protected synchronized AMQTransportConnectionState unregisterConnectionState(
-         ConnectionId connectionId)
-   {
-      return connectionStateRegister.unregisterConnectionState(connectionId);
-   }
-
-   protected synchronized AMQTransportConnectionState registerConnectionState(
-         ConnectionId connectionId, AMQTransportConnectionState state)
-   {
-      AMQTransportConnectionState cs = null;
-      if (!connectionStateRegister.isEmpty()
-            && !connectionStateRegister.doesHandleMultipleConnectionStates())
-      {
-         // swap implementations
-         AMQTransportConnectionStateRegister newRegister = new AMQMapTransportConnectionStateRegister();
-         newRegister.intialize(connectionStateRegister);
-         connectionStateRegister = newRegister;
-      }
-      cs = connectionStateRegister.registerConnectionState(connectionId, state);
-      return cs;
-   }
-
    public void delayedStop(final int waitTime, final String reason,
          Throwable cause)
    {
@@ -997,16 +938,9 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
       }
       if (stopping.compareAndSet(false, true))
       {
-         // Let all the connection contexts know we are shutting down
-         // so that in progress operations can notice and unblock.
-         List<AMQTransportConnectionState> connectionStates = listConnectionStates();
-         for (AMQTransportConnectionState cs : connectionStates)
+         if (context != null)
          {
-            AMQConnectionContext connectionContext = cs.getContext();
-            if (connectionContext != null)
-            {
-               connectionContext.getStopping().set(true);
-            }
+            context.getStopping().set(true);
          }
          try
          {
@@ -1040,11 +974,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
       }
    }
 
-   protected synchronized List<AMQTransportConnectionState> listConnectionStates()
-   {
-      return connectionStateRegister.listConnectionStates();
-   }
-
    protected void doStop() throws Exception
    {
       this.acceptorUsed.onStopped(this);
@@ -1095,19 +1024,14 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
       // from the broker.
       if (!protocolManager.isStopped())
       {
-         List<AMQTransportConnectionState> connectionStates = listConnectionStates();
-         connectionStates = listConnectionStates();
-         for (AMQTransportConnectionState cs : connectionStates)
+         context.getStopping().set(true);
+         try
          {
-            cs.getContext().getStopping().set(true);
-            try
-            {
-               processRemoveConnection(cs.getInfo().getConnectionId(), 0L);
-            }
-            catch (Throwable ignore)
-            {
-               ignore.printStackTrace();
-            }
+            processRemoveConnection(state.getInfo().getConnectionId(), 0L);
+         }
+         catch (Throwable ignore)
+         {
+            ignore.printStackTrace();
          }
       }
    }
@@ -1134,16 +1058,21 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
       return resp;
    }
 
-   AMQConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id)
+   public void addConsumerBrokerExchange(ConsumerId id, AMQSession amqSession, Map<ActiveMQDestination, AMQConsumer> consumerMap)
    {
       AMQConsumerBrokerExchange result = consumerExchanges.get(id);
       if (result == null)
       {
+         if (consumerMap.size() == 1)
+         {
+            result = new AMQSingleConsumerBrokerExchange(amqSession, consumerMap.values().iterator().next());
+         }
+         else
+         {
+            result = new AMQCompositeConsumerBrokerExchange(amqSession, consumerMap);
+         }
          synchronized (consumerExchanges)
          {
-            result = new AMQConsumerBrokerExchange();
-            AMQTransportConnectionState state = lookupConnectionState(id);
-            context = state.getContext();
             result.setConnectionContext(context);
             SessionState ss = state.getSessionState(id.getParentId());
             if (ss != null)
@@ -1165,63 +1094,36 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
             consumerExchanges.put(id, result);
          }
       }
-      return result;
-   }
-
-   protected synchronized AMQTransportConnectionState lookupConnectionState(
-         ConsumerId id)
-   {
-      return connectionStateRegister.lookupConnectionState(id);
-   }
-
-   protected synchronized AMQTransportConnectionState lookupConnectionState(
-         ProducerId id)
-   {
-      return connectionStateRegister.lookupConnectionState(id);
    }
 
-   public int getConsumerCount(ConnectionId connectionId)
+   public int getConsumerCount()
    {
       int result = 0;
-      AMQTransportConnectionState cs = lookupConnectionState(connectionId);
-      if (cs != null)
+      for (SessionId sessionId : state.getSessionIds())
       {
-         for (SessionId sessionId : cs.getSessionIds())
+         SessionState sessionState = state.getSessionState(sessionId);
+         if (sessionState != null)
          {
-            SessionState sessionState = cs.getSessionState(sessionId);
-            if (sessionState != null)
-            {
-               result += sessionState.getConsumerIds().size();
-            }
+            result += sessionState.getConsumerIds().size();
          }
       }
       return result;
    }
 
-   public int getProducerCount(ConnectionId connectionId)
+   public int getProducerCount()
    {
       int result = 0;
-      AMQTransportConnectionState cs = lookupConnectionState(connectionId);
-      if (cs != null)
+      for (SessionId sessionId : state.getSessionIds())
       {
-         for (SessionId sessionId : cs.getSessionIds())
+         SessionState sessionState = state.getSessionState(sessionId);
+         if (sessionState != null)
          {
-            SessionState sessionState = cs.getSessionState(sessionId);
-            if (sessionState != null)
-            {
-               result += sessionState.getProducerIds().size();
-            }
+            result += sessionState.getProducerIds().size();
          }
       }
       return result;
    }
 
-   public synchronized AMQTransportConnectionState lookupConnectionState(
-         ConnectionId connectionId)
-   {
-      return connectionStateRegister.lookupConnectionState(connectionId);
-   }
-
    @Override
    public Response processAddDestination(DestinationInfo dest) throws Exception
    {
@@ -1273,20 +1175,18 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
    @Override
    public Response processAddSession(SessionInfo info) throws Exception
    {
-      ConnectionId connectionId = info.getSessionId().getParentId();
-      AMQTransportConnectionState cs = lookupConnectionState(connectionId);
       // Avoid replaying dup commands
-      if (cs != null && !cs.getSessionIds().contains(info.getSessionId()))
+      if (!state.getSessionIds().contains(info.getSessionId()))
       {
          protocolManager.addSession(this, info);
          try
          {
-            cs.addSession(info);
+            state.addSession(info);
          }
          catch (IllegalStateException e)
          {
             e.printStackTrace();
-            protocolManager.removeSession(cs.getContext(), info);
+            protocolManager.removeSession(context, info);
          }
       }
       return null;
@@ -1422,10 +1322,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
                   //in that case don't send the response
                   //this will force the client to wait until
                   //the response is got.
-                  if (context == null)
-                  {
-                     this.context = new AMQConnectionContext();
-                  }
                   context.setDontSendReponse(true);
                }
                else
@@ -1463,9 +1359,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
          synchronized (producerExchanges)
          {
             result = new AMQProducerBrokerExchange();
-            AMQTransportConnectionState state = lookupConnectionState(id);
-            context = state.getContext();
             result.setConnectionContext(context);
+            //todo implement reconnect https://issues.apache.org/jira/browse/ARTEMIS-194
             if (context.isReconnect()
                   || (context.isNetworkConnection() && this.acceptorUsed
                         .isAuditNetworkProducers()))
@@ -1490,20 +1385,14 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
             producerExchanges.put(id, result);
          }
       }
-      else
-      {
-         context = result.getConnectionContext();
-      }
       return result;
    }
 
    @Override
    public Response processMessageAck(MessageAck ack) throws Exception
    {
-      ConsumerId consumerId = ack.getConsumerId();
-      SessionId sessionId = consumerId.getParentId();
-      AMQSession session = protocolManager.getSession(sessionId);
-      session.acknowledge(ack);
+      AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId());
+      consumerBrokerExchange.acknowledge(ack);
       return null;
    }
 
@@ -1523,7 +1412,13 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
    @Override
    public Response processMessagePull(MessagePull arg0) throws Exception
    {
-      throw new IllegalStateException("not implemented! ");
+      AMQConsumerBrokerExchange amqConsumerBrokerExchange = consumerExchanges.get(arg0.getConsumerId());
+      if (amqConsumerBrokerExchange == null)
+      {
+         throw new IllegalStateException("Consumer does not exist");
+      }
+      amqConsumerBrokerExchange.processMessagePull(arg0);
+      return null;
    }
 
    @Override
@@ -1542,8 +1437,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
    @Override
    public Response processRecoverTransactions(TransactionInfo info) throws Exception
    {
-      AMQTransportConnectionState cs = lookupConnectionState(info.getConnectionId());
-      Set<SessionId> sIds = cs.getSessionIds();
+      Set<SessionId> sIds = state.getSessionIds();
       TransactionId[] recovered = protocolManager.recoverTransactions(sIds);
       return new DataArrayResponse(recovered);
    }
@@ -1552,48 +1446,31 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
    public Response processRemoveConnection(ConnectionId id,
          long lastDeliveredSequenceId) throws Exception
    {
-      AMQTransportConnectionState cs = lookupConnectionState(id);
-      if (cs != null)
+      // Don't allow things to be added to the connection state while we
+      // are shutting down.
+      state.shutdown();
+      // Cascade the connection stop to the sessions.
+      for (SessionId sessionId : state.getSessionIds())
       {
-         // Don't allow things to be added to the connection state while we
-         // are shutting down.
-         cs.shutdown();
-         // Cascade the connection stop to the sessions.
-         for (SessionId sessionId : cs.getSessionIds())
-         {
-            try
-            {
-               processRemoveSession(sessionId, lastDeliveredSequenceId);
-            }
-            catch (Throwable e)
-            {
-               // LOG
-            }
-         }
-
          try
          {
-            protocolManager.removeConnection(cs.getContext(), cs.getInfo(),
-                  null);
+            processRemoveSession(sessionId, lastDeliveredSequenceId);
          }
          catch (Throwable e)
          {
-            // log
-         }
-         AMQTransportConnectionState state = unregisterConnectionState(id);
-         if (state != null)
-         {
-            synchronized (brokerConnectionStates)
-            {
-               // If we are the last reference, we should remove the state
-               // from the broker.
-               if (state.decrementReference() == 0)
-               {
-                  brokerConnectionStates.remove(id);
-               }
-            }
+            // LOG
          }
       }
+
+      try
+      {
+         protocolManager.removeConnection(context, state.getInfo(),
+               null);
+      }
+      catch (Throwable e)
+      {
+         // log
+      }
       return null;
    }
 
@@ -1602,15 +1479,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
          long lastDeliveredSequenceId) throws Exception
    {
       SessionId sessionId = id.getParentId();
-      ConnectionId connectionId = sessionId.getParentId();
-      AMQTransportConnectionState cs = lookupConnectionState(connectionId);
-      if (cs == null)
-      {
-         throw new IllegalStateException(
-               "Cannot remove a consumer from a connection that had not been registered: "
-                     + connectionId);
-      }
-      SessionState ss = cs.getSessionState(sessionId);
+      SessionState ss = state.getSessionState(sessionId);
       if (ss == null)
       {
          throw new IllegalStateException(
@@ -1625,8 +1494,13 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
       }
       ConsumerInfo info = consumerState.getInfo();
       info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
-      protocolManager.removeConsumer(cs.getContext(), consumerState.getInfo());
+
+      AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(id);
+
+      consumerBrokerExchange.removeConsumer();
+
       removeConsumerBrokerExchange(id);
+
       return null;
    }
 
@@ -1661,15 +1535,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
    public Response processRemoveSession(SessionId id,
          long lastDeliveredSequenceId) throws Exception
    {
-      ConnectionId connectionId = id.getParentId();
-      AMQTransportConnectionState cs = lookupConnectionState(connectionId);
-      if (cs == null)
-      {
-         throw new IllegalStateException(
-               "Cannot remove session from connection that had not been registered: "
-                     + connectionId);
-      }
-      SessionState session = cs.getSessionState(id);
+      SessionState session = state.getSessionState(id);
       if (session == null)
       {
          throw new IllegalStateException(
@@ -1701,8 +1567,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
             // LOG.warn("Failed to remove producer: {}", producerId, e);
          }
       }
-      cs.removeSession(id);
-      protocolManager.removeSession(cs.getContext(), session.getInfo());
+      state.removeSession(id);
+      protocolManager.removeSession(context, session.getInfo());
       return null;
    }
 
@@ -1790,7 +1656,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
 
    public AMQConnectionContext getConext()
    {
-      return this.state.getContext();
+      return this.context;
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81756739/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 8e7d31c..1a4386d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
 
 import io.netty.channel.ChannelPipeline;
 import org.apache.activemq.advisory.AdvisorySupport;
@@ -55,6 +56,7 @@ import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
@@ -69,7 +71,6 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdap
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransportConnectionState;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -114,8 +115,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
    protected final ProducerId advisoryProducerId = new ProducerId();
 
    // from broker
-   protected final Map<ConnectionId, ConnectionState> brokerConnectionStates = Collections
-      .synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
+   protected final Map<ConnectionId, OpenWireConnection> brokerConnectionStates = Collections
+      .synchronizedMap(new HashMap<ConnectionId, OpenWireConnection>());
 
    private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<OpenWireConnection>();
 
@@ -131,6 +132,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
    private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<String, SessionId>();
 
+   private final ScheduledExecutorService scheduledPool;
+
    public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server)
    {
       this.factory = factory;
@@ -141,6 +144,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       brokerState = new BrokerState();
       advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
       ManagementService service = server.getManagementService();
+      scheduledPool = server.getScheduledPool();
       if (service != null)
       {
          service.addNotificationListener(this);
@@ -239,7 +243,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
    }
 
    public void handleCommand(OpenWireConnection openWireConnection,
-                             Object command)
+                             Object command) throws Exception
    {
       Command amqCmd = (Command) command;
       byte type = amqCmd.getDataStructureType();
@@ -252,6 +256,10 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
              * failover and load balancing.  These features are not yet implemented for Artemis OpenWire.  Instead we
              * simply drop the packet.  See: ACTIVEMQ6-108 */
             break;
+         case CommandTypes.MESSAGE_PULL:
+            MessagePull messagePull = (MessagePull) amqCmd;
+            openWireConnection.processMessagePull(messagePull);
+            break;
          case CommandTypes.CONSUMER_CONTROL:
             break;
          default:
@@ -306,11 +314,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       }
    }
 
-   public Map<ConnectionId, ConnectionState> getConnectionStates()
-   {
-      return this.brokerConnectionStates;
-   }
-
    public void addConnection(AMQConnectionContext context, ConnectionInfo info) throws Exception
    {
       String username = info.getUserName();
@@ -483,8 +486,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
    {
       SessionId sessionId = info.getProducerId().getParentId();
       ConnectionId connectionId = sessionId.getParentId();
-      AMQTransportConnectionState cs = theConn
-         .lookupConnectionState(connectionId);
+      ConnectionState cs = theConn.getState();
       if (cs == null)
       {
          throw new IllegalStateException(
@@ -505,7 +507,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
          if (destination != null
             && !AdvisorySupport.isAdvisoryTopic(destination))
          {
-            if (theConn.getProducerCount(connectionId) >= theConn
+            if (theConn.getProducerCount() >= theConn
                .getMaximumProducersAllowedPerConnection())
             {
                throw new IllegalStateException(
@@ -541,8 +543,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       // Todo: add a destination interceptors holder here (amq supports this)
       SessionId sessionId = info.getConsumerId().getParentId();
       ConnectionId connectionId = sessionId.getParentId();
-      AMQTransportConnectionState cs = theConn
-         .lookupConnectionState(connectionId);
+      ConnectionState cs = theConn.getState();
       if (cs == null)
       {
          throw new IllegalStateException(
@@ -564,7 +565,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
          if (destination != null
             && !AdvisorySupport.isAdvisoryTopic(destination))
          {
-            if (theConn.getConsumerCount(connectionId) >= theConn
+            if (theConn.getConsumerCount() >= theConn
                .getMaximumConsumersAllowedPerConnection())
             {
                throw new IllegalStateException(
@@ -580,17 +581,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
             throw new IllegalStateException("Session not exist! : " + sessionId);
          }
 
-         amqSession.createConsumer(info);
+         amqSession.createConsumer(info, amqSession);
 
-         try
-         {
-            ss.addConsumer(info);
-            theConn.addConsumerBrokerExchange(info.getConsumerId());
-         }
-         catch (IllegalStateException e)
-         {
-            amqSession.removeConsumer(info);
-         }
+         ss.addConsumer(info);
       }
    }
 
@@ -614,7 +607,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
                                 boolean internal)
    {
       AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss,
-                                             server, theConn, this);
+                                             server, theConn, scheduledPool, this);
       amqSession.initialize();
       amqSession.setInternal(internal);
       sessions.put(ss.getSessionId(), amqSession);
@@ -644,13 +637,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       }
    }
 
-   public void removeConsumer(AMQConnectionContext context, ConsumerInfo info) throws Exception
-   {
-      SessionId sessionId = info.getConsumerId().getParentId();
-      AMQSession session = sessions.get(sessionId);
-      session.removeConsumer(info);
-   }
-
    public void removeProducer(ProducerId id)
    {
       SessionId sessionId = id.getParentId();
@@ -677,7 +663,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       {
          SimpleString qName = new SimpleString("jms.queue."
                                                   + dest.getPhysicalName());
-         ConnectionState state = connection.brokerConnectionStates.get(info.getConnectionId());
+         ConnectionState state = connection.getState();
          ConnectionInfo connInfo = state.getInfo();
          if (connInfo != null)
          {
@@ -849,12 +835,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       {
          ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination);
          ConnectionId connId = sessionId.getParentId();
-         AMQTransportConnectionState cc = (AMQTransportConnectionState)this.brokerConnectionStates.get(connId);
-         OpenWireConnection conn = cc.getConnection();
+         OpenWireConnection cc = this.brokerConnectionStates.get(connId);
          ActiveMQMessage advisoryMessage = new ActiveMQMessage();
          advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, consumer.getId().toString());
 
-         fireAdvisory(conn.getConext(), topic, advisoryMessage, consumer.getId());
+         fireAdvisory(cc.getConext(), topic, advisoryMessage, consumer.getId());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81756739/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
new file mode 100644
index 0000000..7fe3685
--- /dev/null
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.openwire.amq;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessagePull;
+
+import java.util.Map;
+
+public class AMQCompositeConsumerBrokerExchange extends AMQConsumerBrokerExchange
+{
+
+   private final Map<ActiveMQDestination, AMQConsumer> consumerMap;
+
+   public AMQCompositeConsumerBrokerExchange(AMQSession amqSession, Map<ActiveMQDestination, AMQConsumer> consumerMap)
+   {
+      super(amqSession);
+      this.consumerMap = consumerMap;
+   }
+
+   public void processMessagePull(MessagePull messagePull) throws Exception
+   {
+      AMQConsumer amqConsumer = consumerMap.get(messagePull.getDestination());
+      if (amqConsumer != null)
+      {
+         amqConsumer.processMessagePull(messagePull);
+      }
+   }
+
+   public void acknowledge(MessageAck ack) throws Exception
+   {
+      AMQConsumer amqConsumer = consumerMap.get(ack.getDestination());
+      if (amqConsumer != null)
+      {
+         amqSession.acknowledge(ack, amqConsumer);
+      }
+   }
+
+   public void removeConsumer() throws Exception
+   {
+      for (AMQConsumer amqConsumer : consumerMap.values())
+      {
+         amqConsumer.removeConsumer();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81756739/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 1292aee..59f6d26 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -21,13 +21,19 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -42,20 +48,28 @@ public class AMQConsumer implements BrowserListener
    private AMQSession session;
    private org.apache.activemq.command.ActiveMQDestination actualDest;
    private ConsumerInfo info;
+   private final ScheduledExecutorService scheduledPool;
    private long nativeId = -1;
    private SimpleString subQueueName = null;
 
    private final int prefetchSize;
-   private AtomicInteger currentSize;
+   private AtomicInteger windowAvailable;
    private final java.util.Queue<MessageInfo> deliveringRefs = new ConcurrentLinkedQueue<MessageInfo>();
+   private long messagePullSequence = 0;
+   private MessagePullHandler messagePullHandler;
 
-   public AMQConsumer(AMQSession amqSession, org.apache.activemq.command.ActiveMQDestination d, ConsumerInfo info)
+   public AMQConsumer(AMQSession amqSession, org.apache.activemq.command.ActiveMQDestination d, ConsumerInfo info, ScheduledExecutorService scheduledPool)
    {
       this.session = amqSession;
       this.actualDest = d;
       this.info = info;
+      this.scheduledPool = scheduledPool;
       this.prefetchSize = info.getPrefetchSize();
-      this.currentSize = new AtomicInteger(0);
+      this.windowAvailable = new AtomicInteger(prefetchSize);
+      if (prefetchSize == 0)
+      {
+         messagePullHandler = new MessagePullHandler();
+      }
    }
 
    public void init() throws Exception
@@ -130,12 +144,12 @@ public class AMQConsumer implements BrowserListener
             coreSession.createQueue(address, subQueueName, selector, true, false);
          }
 
-         coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, Integer.MAX_VALUE);
+         coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, -1);
       }
       else
       {
          SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName());
-         coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, Integer.MAX_VALUE);
+         coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
       }
 
       if (info.isBrowser())
@@ -163,23 +177,14 @@ public class AMQConsumer implements BrowserListener
 
    public void acquireCredit(int n) throws Exception
    {
-      this.currentSize.addAndGet(-n);
-      if (currentSize.get() < prefetchSize)
+      boolean promptDelivery = windowAvailable.get() == 0;
+      if (windowAvailable.get() < prefetchSize)
       {
-         AtomicInteger credits = session.getCoreSession().getConsumerCredits(nativeId);
-         credits.set(0);
-         session.getCoreSession().receiveConsumerCredits(nativeId, Integer.MAX_VALUE);
+         this.windowAvailable.addAndGet(n);
       }
-   }
-
-   public void checkCreditOnDelivery() throws Exception
-   {
-      this.currentSize.incrementAndGet();
-
-      if (currentSize.get() == prefetchSize)
+      if (promptDelivery)
       {
-         //stop because reach prefetchSize
-         session.getCoreSession().receiveConsumerCredits(nativeId, 0);
+         session.getCoreSession().promptDelivery(nativeId);
       }
    }
 
@@ -188,12 +193,16 @@ public class AMQConsumer implements BrowserListener
       MessageDispatch dispatch;
       try
       {
+         if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message))
+         {
+            return 0;
+         }
          //decrement deliveryCount as AMQ client tends to add 1.
          dispatch = OpenWireMessageConverter.createMessageDispatch(message, deliveryCount - 1, this);
          int size = dispatch.getMessage().getSize();
          this.deliveringRefs.add(new MessageInfo(dispatch.getMessage().getMessageId(), message.getMessageID(), size));
          session.deliverMessage(dispatch);
-         checkCreditOnDelivery();
+         windowAvailable.decrementAndGet();
          return size;
       }
       catch (IOException e)
@@ -206,6 +215,16 @@ public class AMQConsumer implements BrowserListener
       }
    }
 
+   public void handleDeliverNullDispatch()
+   {
+      MessageDispatch md = new MessageDispatch();
+      md.setConsumerId(getId());
+      md.setDestination(actualDest);
+      session.deliverMessage(md);
+      windowAvailable.decrementAndGet();
+   }
+
+
    public void acknowledge(MessageAck ack) throws Exception
    {
       MessageId first = ack.getFirstMessageId();
@@ -400,4 +419,90 @@ public class AMQConsumer implements BrowserListener
    {
       return info;
    }
+
+   public boolean hasCredits()
+   {
+      return windowAvailable.get() > 0;
+   }
+
+   public void processMessagePull(MessagePull messagePull) throws Exception
+   {
+      windowAvailable.incrementAndGet();
+
+      if (messagePullHandler != null)
+      {
+         messagePullHandler.nextSequence(messagePullSequence++, messagePull.getTimeout());
+      }
+   }
+
+   public void removeConsumer() throws Exception
+   {
+      session.removeConsumer(nativeId);
+   }
+
+   private class MessagePullHandler
+   {
+      private long next = -1;
+      private long timeout;
+      private CountDownLatch latch = new CountDownLatch(1);
+      private ScheduledFuture<?> messagePullFuture;
+
+      public void nextSequence(long next, long timeout) throws Exception
+      {
+         this.next = next;
+         this.timeout = timeout;
+         latch = new CountDownLatch(1);
+         session.getCoreSession().forceConsumerDelivery(nativeId, messagePullSequence);
+         //if we are 0 timeout or less we need to wait to get either the forced message or a real message.
+         if (timeout <= 0)
+         {
+            latch.await(10, TimeUnit.SECONDS);
+            //this means we have received no message just the forced delivery message
+            if (this.next >= 0)
+            {
+               handleDeliverNullDispatch();
+            }
+         }
+      }
+
+      public boolean checkForcedConsumer(ServerMessage message)
+      {
+         if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
+         {
+            System.out.println("MessagePullHandler.checkForcedConsumer");
+            if (next >= 0)
+            {
+               if (timeout <= 0)
+               {
+                  latch.countDown();
+               }
+               else
+               {
+                  messagePullFuture = scheduledPool.schedule(new Runnable()
+                  {
+                     @Override
+                     public void run()
+                     {
+                        if (next >= 0)
+                        {
+                           handleDeliverNullDispatch();
+                        }
+                     }
+                  }, timeout, TimeUnit.MILLISECONDS);
+               }
+            }
+            return false;
+         }
+         else
+         {
+            next = -1;
+            if (messagePullFuture != null)
+            {
+               messagePullFuture.cancel(true);
+            }
+            latch.countDown();
+            return true;
+         }
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81756739/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
index fd62964..168f557 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
@@ -16,13 +16,22 @@
  */
 package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
-public class AMQConsumerBrokerExchange
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessagePull;
+
+public abstract class AMQConsumerBrokerExchange
 {
+   protected final AMQSession amqSession;
    private AMQConnectionContext connectionContext;
    private AMQDestination regionDestination;
    private AMQSubscription subscription;
    private boolean wildcard;
 
+   public AMQConsumerBrokerExchange(AMQSession amqSession)
+   {
+      this.amqSession = amqSession;
+   }
+
    /**
     * @return the connectionContext
     */
@@ -90,4 +99,10 @@ public class AMQConsumerBrokerExchange
    {
       this.wildcard = wildcard;
    }
+
+   public abstract void acknowledge(MessageAck ack) throws Exception;
+
+   public abstract void processMessagePull(MessagePull messagePull) throws Exception;
+
+   public abstract void removeConsumer() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81756739/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQMapTransportConnectionStateRegister.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQMapTransportConnectionStateRegister.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQMapTransportConnectionStateRegister.java
deleted file mode 100644
index 4fc3246..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQMapTransportConnectionStateRegister.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.SessionId;
-
-public class AMQMapTransportConnectionStateRegister implements
-      AMQTransportConnectionStateRegister
-{
-
-   private Map<ConnectionId, AMQTransportConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, AMQTransportConnectionState>();
-
-   public AMQTransportConnectionState registerConnectionState(
-         ConnectionId connectionId, AMQTransportConnectionState state)
-   {
-      AMQTransportConnectionState rc = connectionStates
-            .put(connectionId, state);
-      return rc;
-   }
-
-   public AMQTransportConnectionState unregisterConnectionState(
-         ConnectionId connectionId)
-   {
-      AMQTransportConnectionState rc = connectionStates.remove(connectionId);
-      if (rc.getReferenceCounter().get() > 1)
-      {
-         rc.decrementReference();
-         connectionStates.put(connectionId, rc);
-      }
-      return rc;
-   }
-
-   public List<AMQTransportConnectionState> listConnectionStates()
-   {
-
-      List<AMQTransportConnectionState> rc = new ArrayList<AMQTransportConnectionState>();
-      rc.addAll(connectionStates.values());
-      return rc;
-   }
-
-   public AMQTransportConnectionState lookupConnectionState(String connectionId)
-   {
-      return connectionStates.get(new ConnectionId(connectionId));
-   }
-
-   public AMQTransportConnectionState lookupConnectionState(ConsumerId id)
-   {
-      AMQTransportConnectionState cs = lookupConnectionState(id
-            .getConnectionId());
-      if (cs == null)
-      {
-         throw new IllegalStateException(
-               "Cannot lookup a consumer from a connection that had not been registered: "
-                     + id.getParentId().getParentId());
-      }
-      return cs;
-   }
-
-   public AMQTransportConnectionState lookupConnectionState(ProducerId id)
-   {
-      AMQTransportConnectionState cs = lookupConnectionState(id
-            .getConnectionId());
-      if (cs == null)
-      {
-         throw new IllegalStateException(
-               "Cannot lookup a producer from a connection that had not been registered: "
-                     + id.getParentId().getParentId());
-      }
-      return cs;
-   }
-
-   public AMQTransportConnectionState lookupConnectionState(SessionId id)
-   {
-      AMQTransportConnectionState cs = lookupConnectionState(id
-            .getConnectionId());
-      if (cs == null)
-      {
-         throw new IllegalStateException(
-               "Cannot lookup a session from a connection that had not been registered: "
-                     + id.getParentId());
-      }
-      return cs;
-   }
-
-   public AMQTransportConnectionState lookupConnectionState(
-         ConnectionId connectionId)
-   {
-      AMQTransportConnectionState cs = connectionStates.get(connectionId);
-      if (cs == null)
-      {
-         throw new IllegalStateException(
-               "Cannot lookup a connection that had not been registered: "
-                     + connectionId);
-      }
-      return cs;
-   }
-
-   public boolean doesHandleMultipleConnectionStates()
-   {
-      return true;
-   }
-
-   public boolean isEmpty()
-   {
-      return connectionStates.isEmpty();
-   }
-
-   public void clear()
-   {
-      connectionStates.clear();
-
-   }
-
-   public void intialize(AMQTransportConnectionStateRegister other)
-   {
-      connectionStates.clear();
-      connectionStates.putAll(other.mapStates());
-
-   }
-
-   public Map<ConnectionId, AMQTransportConnectionState> mapStates()
-   {
-      HashMap<ConnectionId, AMQTransportConnectionState> map = new HashMap<ConnectionId, AMQTransportConnectionState>(
-            connectionStates);
-      return map;
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81756739/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index ef64b6c..2c24903 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.Message;
@@ -69,12 +68,10 @@ public class AMQSession implements SessionCallback
    private SessionInfo sessInfo;
    private ActiveMQServer server;
    private OpenWireConnection connection;
-   //native id -> consumer
-   private Map<Long, AMQConsumer> consumers = new ConcurrentHashMap<Long, AMQConsumer>();
-   //amq id -> native id
-   private Map<Long, Long> consumerIdMap = new HashMap<Long, Long>();
 
-   private Map<Long, AMQProducer> producers = new HashMap<Long, AMQProducer>();
+   private Map<Long, AMQConsumer> consumers = new ConcurrentHashMap<>();
+
+   private Map<Long, AMQProducer> producers = new HashMap<>();
 
    private AtomicBoolean started = new AtomicBoolean(false);
 
@@ -82,15 +79,18 @@ public class AMQSession implements SessionCallback
 
    private boolean isTx;
 
+   private final ScheduledExecutorService scheduledPool;
+
    private OpenWireProtocolManager manager;
 
    public AMQSession(ConnectionInfo connInfo, SessionInfo sessInfo,
-         ActiveMQServer server, OpenWireConnection connection, OpenWireProtocolManager manager)
+                     ActiveMQServer server, OpenWireConnection connection, ScheduledExecutorService scheduledPool, OpenWireProtocolManager manager)
    {
       this.connInfo = connInfo;
       this.sessInfo = sessInfo;
       this.server = server;
       this.connection = connection;
+      this.scheduledPool = scheduledPool;
       this.manager = manager;
    }
 
@@ -123,7 +123,7 @@ public class AMQSession implements SessionCallback
 
    }
 
-   public void createConsumer(ConsumerInfo info) throws Exception
+   public void createConsumer(ConsumerInfo info, AMQSession amqSession) throws Exception
    {
       //check destination
       ActiveMQDestination dest = info.getDestination();
@@ -136,7 +136,7 @@ public class AMQSession implements SessionCallback
       {
          dests = new ActiveMQDestination[] {dest};
       }
-
+      Map<ActiveMQDestination, AMQConsumer> consumerMap = new HashMap<>();
       for (ActiveMQDestination d : dests)
       {
          if (d.isQueue())
@@ -144,11 +144,13 @@ public class AMQSession implements SessionCallback
             SimpleString queueName = OpenWireUtil.toCoreAddress(d);
             getCoreServer().getJMSQueueCreator().create(queueName);
          }
-         AMQConsumer consumer = new AMQConsumer(this, d, info);
+         AMQConsumer consumer = new AMQConsumer(this, d, info, scheduledPool);
          consumer.init();
+         consumerMap.put(d, consumer);
          consumers.put(consumer.getNativeId(), consumer);
-         this.consumerIdMap.put(info.getConsumerId().getValue(), consumer.getNativeId());
       }
+      connection.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumerMap);
+
       coreSession.start();
       started.set(true);
    }
@@ -214,7 +216,8 @@ public class AMQSession implements SessionCallback
    @Override
    public boolean hasCredits(ServerConsumer consumerID)
    {
-      return true;
+      AMQConsumer amqConsumer = consumers.get(consumerID.getID());
+      return amqConsumer.hasCredits();
    }
 
    @Override
@@ -234,19 +237,12 @@ public class AMQSession implements SessionCallback
       return this.server;
    }
 
-   public void removeConsumer(ConsumerInfo info) throws Exception
+   public void removeConsumer(long consumerId) throws Exception
    {
-      long consumerId = info.getConsumerId().getValue();
-      long nativeId = this.consumerIdMap.remove(consumerId);
-      if (this.txId != null || this.isTx)
-      {
-         ((AMQServerSession)coreSession).amqCloseConsumer(nativeId, false);
-      }
-      else
-      {
-         ((AMQServerSession)coreSession).amqCloseConsumer(nativeId, true);
-      }
-      AMQConsumer consumer = consumers.remove(nativeId);
+      boolean failed = !(this.txId != null || this.isTx);
+
+      coreSession.amqCloseConsumer(consumerId, failed);
+      consumers.remove(consumerId);
    }
 
    public void createProducer(ProducerInfo info) throws Exception
@@ -331,16 +327,13 @@ public class AMQSession implements SessionCallback
       return this.connection.getMarshaller();
    }
 
-   public void acknowledge(MessageAck ack) throws Exception
+   public void acknowledge(MessageAck ack, AMQConsumer consumer) throws Exception
    {
       TransactionId tid = ack.getTransactionId();
       if (tid != null)
       {
          this.resetSessionTx(ack.getTransactionId());
       }
-      ConsumerId consumerId = ack.getConsumerId();
-      long nativeConsumerId = consumerIdMap.get(consumerId.getValue());
-      AMQConsumer consumer = consumers.get(nativeConsumerId);
       consumer.acknowledge(ack);
 
       if (tid == null && ack.getAckType() == MessageAck.STANDARD_ACK_TYPE)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81756739/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java
new file mode 100644
index 0000000..9a1d4f8
--- /dev/null
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.openwire.amq;
+
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessagePull;
+
+public class AMQSingleConsumerBrokerExchange extends AMQConsumerBrokerExchange
+{
+   private AMQConsumer consumer;
+
+   public AMQSingleConsumerBrokerExchange(AMQSession amqSession, AMQConsumer consumer)
+   {
+      super(amqSession);
+      this.consumer = consumer;
+   }
+
+   public void processMessagePull(MessagePull messagePull) throws Exception
+   {
+      consumer.processMessagePull(messagePull);
+   }
+
+   public void removeConsumer() throws Exception
+   {
+      consumer.removeConsumer();
+   }
+
+   public void acknowledge(MessageAck ack) throws Exception
+   {
+      amqSession.acknowledge(ack, consumer);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81756739/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleTransportConnectionStateRegister.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleTransportConnectionStateRegister.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleTransportConnectionStateRegister.java
deleted file mode 100644
index 93d1591..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleTransportConnectionStateRegister.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.SessionId;
-
-/**
- * We just copy this structure from amq, but what's the purpose
- * and can it be removed ?
- */
-public class AMQSingleTransportConnectionStateRegister implements
-      AMQTransportConnectionStateRegister
-{
-
-   private AMQTransportConnectionState connectionState;
-   private ConnectionId connectionId;
-
-   public AMQTransportConnectionState registerConnectionState(
-         ConnectionId connectionId, AMQTransportConnectionState state)
-   {
-      AMQTransportConnectionState rc = connectionState;
-      connectionState = state;
-      this.connectionId = connectionId;
-      return rc;
-   }
-
-   public synchronized AMQTransportConnectionState unregisterConnectionState(
-         ConnectionId connectionId)
-   {
-      AMQTransportConnectionState rc = null;
-
-      if (connectionId != null && connectionState != null
-            && this.connectionId != null)
-      {
-         if (this.connectionId.equals(connectionId))
-         {
-            rc = connectionState;
-            connectionState = null;
-            connectionId = null;
-         }
-      }
-      return rc;
-   }
-
-   public synchronized List<AMQTransportConnectionState> listConnectionStates()
-   {
-      List<AMQTransportConnectionState> rc = new ArrayList<AMQTransportConnectionState>();
-      if (connectionState != null)
-      {
-         rc.add(connectionState);
-      }
-      return rc;
-   }
-
-   public synchronized AMQTransportConnectionState lookupConnectionState(
-         String connectionId)
-   {
-      AMQTransportConnectionState cs = connectionState;
-      if (cs == null)
-      {
-         throw new IllegalStateException(
-               "Cannot lookup a connectionId for a connection that had not been registered: "
-                     + connectionId);
-      }
-      return cs;
-   }
-
-   public synchronized AMQTransportConnectionState lookupConnectionState(
-         ConsumerId id)
-   {
-      AMQTransportConnectionState cs = connectionState;
-      if (cs == null)
-      {
-         throw new IllegalStateException(
-               "Cannot lookup a consumer from a connection that had not been registered: "
-                     + id.getParentId().getParentId());
-      }
-      return cs;
-   }
-
-   public synchronized AMQTransportConnectionState lookupConnectionState(
-         ProducerId id)
-   {
-      AMQTransportConnectionState cs = connectionState;
-      if (cs == null)
-      {
-         throw new IllegalStateException(
-               "Cannot lookup a producer from a connection that had not been registered: "
-                     + id.getParentId().getParentId());
-      }
-      return cs;
-   }
-
-   public synchronized AMQTransportConnectionState lookupConnectionState(
-         SessionId id)
-   {
-      AMQTransportConnectionState cs = connectionState;
-      if (cs == null)
-      {
-         throw new IllegalStateException(
-               "Cannot lookup a session from a connection that had not been registered: "
-                     + id.getParentId());
-      }
-      return cs;
-   }
-
-   public synchronized AMQTransportConnectionState lookupConnectionState(
-         ConnectionId connectionId)
-   {
-      AMQTransportConnectionState cs = connectionState;
-      return cs;
-   }
-
-   public synchronized boolean doesHandleMultipleConnectionStates()
-   {
-      return false;
-   }
-
-   public synchronized boolean isEmpty()
-   {
-      return connectionState == null;
-   }
-
-   public void intialize(AMQTransportConnectionStateRegister other)
-   {
-
-      if (other.isEmpty())
-      {
-         clear();
-      }
-      else
-      {
-         Map map = other.mapStates();
-         Iterator i = map.entrySet().iterator();
-         Map.Entry<ConnectionId, AMQTransportConnectionState> entry = (Entry<ConnectionId, AMQTransportConnectionState>) i
-               .next();
-         connectionId = entry.getKey();
-         connectionState = entry.getValue();
-      }
-
-   }
-
-   public Map<ConnectionId, AMQTransportConnectionState> mapStates()
-   {
-      Map<ConnectionId, AMQTransportConnectionState> map = new HashMap<ConnectionId, AMQTransportConnectionState>();
-      if (!isEmpty())
-      {
-         map.put(connectionId, connectionState);
-      }
-      return map;
-   }
-
-   public void clear()
-   {
-      connectionState = null;
-      connectionId = null;
-
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81756739/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransportConnectionState.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransportConnectionState.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransportConnectionState.java
deleted file mode 100644
index a8e5973..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransportConnectionState.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.state.ConnectionState;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
-
-public class AMQTransportConnectionState extends ConnectionState
-{
-
-   private AMQConnectionContext context;
-   private OpenWireConnection connection;
-   private AtomicInteger referenceCounter = new AtomicInteger();
-   private final Object connectionMutex = new Object();
-
-   public AMQTransportConnectionState(ConnectionInfo info,
-         OpenWireConnection transportConnection)
-   {
-      super(info);
-      connection = transportConnection;
-   }
-
-   public AMQConnectionContext getContext()
-   {
-      return context;
-   }
-
-   public OpenWireConnection getConnection()
-   {
-      return connection;
-   }
-
-   public void setContext(AMQConnectionContext context)
-   {
-      this.context = context;
-   }
-
-   public void setConnection(OpenWireConnection connection)
-   {
-      this.connection = connection;
-   }
-
-   public int incrementReference()
-   {
-      return referenceCounter.incrementAndGet();
-   }
-
-   public int decrementReference()
-   {
-      return referenceCounter.decrementAndGet();
-   }
-
-   public AtomicInteger getReferenceCounter()
-   {
-      return referenceCounter;
-   }
-
-   public void setReferenceCounter(AtomicInteger referenceCounter)
-   {
-      this.referenceCounter = referenceCounter;
-   }
-
-   public Object getConnectionMutex()
-   {
-      return connectionMutex;
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81756739/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransportConnectionStateRegister.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransportConnectionStateRegister.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransportConnectionStateRegister.java
deleted file mode 100644
index 642f05b..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransportConnectionStateRegister.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.SessionId;
-
-/**
- * What's the purpose of this?
- */
-public interface AMQTransportConnectionStateRegister
-{
-   AMQTransportConnectionState registerConnectionState(ConnectionId connectionId,
-         AMQTransportConnectionState state);
-
-   AMQTransportConnectionState unregisterConnectionState(ConnectionId connectionId);
-
-   List<AMQTransportConnectionState> listConnectionStates();
-
-   Map<ConnectionId, AMQTransportConnectionState> mapStates();
-
-   AMQTransportConnectionState lookupConnectionState(String connectionId);
-
-   AMQTransportConnectionState lookupConnectionState(ConsumerId id);
-
-   AMQTransportConnectionState lookupConnectionState(ProducerId id);
-
-   AMQTransportConnectionState lookupConnectionState(SessionId id);
-
-   AMQTransportConnectionState lookupConnectionState(ConnectionId connectionId);
-
-   boolean isEmpty();
-
-   boolean doesHandleMultipleConnectionStates();
-
-   void intialize(AMQTransportConnectionStateRegister other);
-
-   void clear();
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81756739/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 49a3926..118b7df 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -788,6 +788,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener
       }
    }
 
+
+   public void promptDelivery(long consumerID)
+   {
+      ServerConsumer consumer = consumers.get(consumerID);
+
+      // this would be possible if the server consumer was closed by pings/pongs.. etc
+      if (consumer != null)
+      {
+         consumer.promptDelivery();
+      }
+   }
+
    public void acknowledge(final long consumerID, final long messageID) throws Exception
    {
       ServerConsumer consumer = consumers.get(consumerID);


Mime
View raw message