activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [37/42] activemq-artemis git commit: ARTEMIS-463 Refactoring on Openwire https://issues.apache.org/jira/browse/ARTEMIS-463
Date Mon, 04 Apr 2016 16:09:46 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index f61705e..89f71ed 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -96,10 +96,11 @@ public class OpenWireMessageConverter implements MessageConverter {
    private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE";
    private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
 
-   @Override
-   public ServerMessage inbound(Object message) {
-      // TODO: implement this
-      return null;
+
+   private final WireFormat marshaller;
+
+   public OpenWireMessageConverter(WireFormat marshaller) {
+      this.marshaller = marshaller;
    }
 
    @Override
@@ -108,10 +109,13 @@ public class OpenWireMessageConverter implements MessageConverter {
       return null;
    }
 
-   //convert an ActiveMQ Artemis message to coreMessage
-   public static void toCoreMessage(ServerMessageImpl coreMessage,
-                                    Message messageSend,
-                                    WireFormat marshaller) throws IOException {
+
+   @Override
+   public ServerMessage inbound(Object message) throws Exception {
+
+      Message messageSend = (Message)message;
+      ServerMessageImpl coreMessage = new ServerMessageImpl(-1, messageSend.getSize());
+
       String type = messageSend.getType();
       if (type != null) {
          coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type));
@@ -391,6 +395,15 @@ public class OpenWireMessageConverter implements MessageConverter {
          coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);
       }
       coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
+
+      ActiveMQDestination origDest = messageSend.getOriginalDestination();
+      if (origDest != null) {
+         ByteSequence origDestBytes = marshaller.marshal(origDest);
+         origDestBytes.compact();
+         coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data);
+      }
+
+      return coreMessage;
    }
 
    private static void loadMapIntoProperties(TypedProperties props, Map<String, Object> map) {
@@ -430,7 +443,7 @@ public class OpenWireMessageConverter implements MessageConverter {
    public static MessageDispatch createMessageDispatch(ServerMessage message,
                                                        int deliveryCount,
                                                        AMQConsumer consumer) throws IOException, JMSException {
-      ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getActualDestination());
+      ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination());
 
       MessageDispatch md = new MessageDispatch();
       md.setConsumerId(consumer.getId());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index f916c8f..bbbb696 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -17,16 +17,13 @@
 package org.apache.activemq.artemis.core.protocol.openwire;
 
 import javax.jms.InvalidClientIDException;
-import java.util.ArrayList;
-import java.util.Collections;
+import javax.transaction.xa.XAException;
 import java.util.HashMap;
-import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -37,24 +34,16 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
-import org.apache.activemq.artemis.api.core.management.ManagementHelper;
-import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
-import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
+import org.apache.activemq.artemis.api.core.client.TopologyMember;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
-import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.management.ManagementService;
-import org.apache.activemq.artemis.core.server.management.Notification;
-import org.apache.activemq.artemis.core.server.management.NotificationListener;
+import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -64,40 +53,31 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
 import org.apache.activemq.artemis.utils.DataConstants;
-import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
-import org.apache.activemq.command.CommandTypes;
-import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionControl;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.SessionId;
-import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.openwire.OpenWireFormatFactory;
-import org.apache.activemq.state.ConnectionState;
 import org.apache.activemq.state.ProducerState;
-import org.apache.activemq.state.SessionState;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.InetAddressUtil;
 import org.apache.activemq.util.LongSequenceGenerator;
 
-public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, NotificationListener {
+public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, ClusterTopologyListener {
 
    private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
    private static final IdGenerator ID_GENERATOR = new IdGenerator();
@@ -109,32 +89,36 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
    private OpenWireFormatFactory wireFactory;
 
-   private boolean tightEncodingEnabled = true;
-
    private boolean prefixPacketSize = true;
 
    private BrokerId brokerId;
    protected final ProducerId advisoryProducerId = new ProducerId();
 
-   // from broker
-   protected final Map<ConnectionId, OpenWireConnection> brokerConnectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, OpenWireConnection>());
-
    private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<>();
 
-   protected final ConcurrentMap<ConnectionId, ConnectionInfo> connectionInfos = new ConcurrentHashMap<>();
-
-   private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<>();
+   // TODO-NOW: this can probably go away
+   private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<String, AMQConnectionContext>();
 
    private String brokerName;
 
-   private Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>();
-
+   // Clebert: Artemis already has a Resource Manager. Need to remove this..
+   //          The TransactionID extends XATransactionID, so all we need is to convert the XID here
    private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<>();
 
-   private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>();
+   private final Map<String, TopologyMember> topologyMap = new ConcurrentHashMap<>();
+
+   private final LinkedList<TopologyMember> members = new LinkedList<>();
 
    private final ScheduledExecutorService scheduledPool;
 
+   //bean properties
+   //http://activemq.apache.org/failover-transport-reference.html
+   private boolean rebalanceClusterClients = false;
+   private boolean updateClusterClients = false;
+   private boolean updateClusterClientsOnRemove = false;
+
+   private final OpenWireMessageConverter messageConverter;
+
    public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
       this.factory = factory;
       this.server = server;
@@ -142,12 +126,82 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       // preferred prop, should be done via config
       wireFactory.setCacheEnabled(false);
       advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
-      ManagementService service = server.getManagementService();
       scheduledPool = server.getScheduledPool();
-      if (service != null) {
-         service.addNotificationListener(this);
+      this.messageConverter = new OpenWireMessageConverter(wireFactory.createWireFormat());
+
+      final ClusterManager clusterManager = this.server.getClusterManager();
+
+      // TODO-NOW: use a property name for the cluster connection
+      ClusterConnection cc = clusterManager.getDefaultConnection(null);
+
+      if (cc != null) {
+         cc.addClusterTopologyListener(this);
+      }
+   }
+
+   public OpenWireFormat getNewWireFormat() {
+      return (OpenWireFormat)wireFactory.createWireFormat();
+   }
+
+   @Override
+   public void nodeUP(TopologyMember member, boolean last) {
+      if (topologyMap.put(member.getNodeId(), member) == null) {
+         updateClientClusterInfo();
+      }
+   }
+
+   public void nodeDown(long eventUID, String nodeID) {
+      if (topologyMap.remove(nodeID) != null) {
+         updateClientClusterInfo();
+      }
+   }
+
+
+   public void removeConnection(ConnectionInfo info,
+                                Throwable error) throws InvalidClientIDException {
+      synchronized (clientIdSet) {
+         String clientId = info.getClientId();
+         if (clientId != null) {
+            AMQConnectionContext context = this.clientIdSet.get(clientId);
+            if (context != null && context.decRefCount() == 0) {
+               //connection is still there and need to close
+               context.getConnection().disconnect(error != null);
+               this.connections.remove(this);//what's that for?
+               this.clientIdSet.remove(clientId);
+            }
+         }
+         else {
+            throw new InvalidClientIDException("No clientID specified for connection disconnect request");
+         }
+      }
+   }
+
+
+   public ScheduledExecutorService getScheduledPool() {
+      return scheduledPool;
+   }
+
+   public ActiveMQServer getServer() {
+      return server;
+   }
+
+   private void updateClientClusterInfo() {
+
+      synchronized (members) {
+         members.clear();
+         members.addAll(topologyMap.values());
       }
 
+      for (OpenWireConnection c : this.connections) {
+         ConnectionControl control = newConnectionControl();
+         try {
+            c.updateClient(control);
+         }
+         catch (Exception e) {
+            ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+            c.sendException(e);
+         }
+      }
    }
 
    @Override
@@ -169,20 +223,20 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
       OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
-      OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, this, wf);
-      owConn.init();
+      OpenWireConnection owConn = new OpenWireConnection(connection, server.getExecutorFactory().getExecutor(), this, wf);
+      owConn.sendHandshake();
 
+      // TODO CLEBERT What is this constant here? we should get it from TTL initial pings
       return new ConnectionEntry(owConn, null, System.currentTimeMillis(), 1 * 60 * 1000);
    }
 
    @Override
    public MessageConverter getConverter() {
-      return new OpenWireMessageConverter();
+      return messageConverter;
    }
 
    @Override
    public void removeHandler(String name) {
-      // TODO Auto-generated method stub
    }
 
    @Override
@@ -225,119 +279,60 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
    @Override
    public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
-      // TODO Auto-generated method stub
-
-   }
-
-   public void handleCommand(OpenWireConnection openWireConnection, Object command) throws Exception {
-      Command amqCmd = (Command) command;
-      byte type = amqCmd.getDataStructureType();
-      switch (type) {
-         case CommandTypes.CONNECTION_INFO:
-            break;
-         case CommandTypes.CONNECTION_CONTROL:
-            /** The ConnectionControl packet sent from client informs the broker that is capable of supporting dynamic
-             * failover and load balancing.  These features are not yet implemented for Artemis OpenWire.  Instead we
-             * simply drop the packet.  See: ACTIVEMQ6-108 */
-            break;
-         case CommandTypes.MESSAGE_PULL:
-            MessagePull messagePull = (MessagePull) amqCmd;
-            openWireConnection.processMessagePull(messagePull);
-            break;
-         case CommandTypes.CONSUMER_CONTROL:
-            break;
-         default:
-            throw new IllegalStateException("Cannot handle command: " + command);
-      }
    }
 
-   public void sendReply(final OpenWireConnection connection, final Command command) {
-      server.getStorageManager().afterCompleteOperations(new IOCallback() {
-         @Override
-         public void onError(final int errorCode, final String errorMessage) {
-            ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);
-         }
-
-         @Override
-         public void done() {
-            send(connection, command);
-         }
-      });
-   }
-
-   public boolean send(final OpenWireConnection connection, final Command command) {
-      if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQServerLogger.LOGGER.trace("sending " + command);
-      }
-      synchronized (connection) {
-         if (connection.isDestroyed()) {
-            return false;
-         }
-
-         try {
-            connection.physicalSend(command);
-         }
-         catch (Exception e) {
-            return false;
-         }
-         catch (Throwable t) {
-            return false;
-         }
-         return true;
-      }
-   }
-
-   public void addConnection(AMQConnectionContext context, ConnectionInfo info) throws Exception {
+   public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception {
       String username = info.getUserName();
       String password = info.getPassword();
 
       if (!this.validateUser(username, password)) {
          throw new SecurityException("User name [" + username + "] or password is invalid.");
       }
+
       String clientId = info.getClientId();
       if (clientId == null) {
          throw new InvalidClientIDException("No clientID specified for connection request");
       }
+
       synchronized (clientIdSet) {
-         AMQConnectionContext oldContext = clientIdSet.get(clientId);
-         if (oldContext != null) {
-            if (context.isAllowLinkStealing()) {
-               clientIdSet.remove(clientId);
-               if (oldContext.getConnection() != null) {
-                  OpenWireConnection connection = oldContext.getConnection();
-                  connection.disconnect(true);
-               }
-               else {
-                  // log error
-               }
+         AMQConnectionContext context;
+         context = clientIdSet.get(clientId);
+         if (context != null) {
+            if (info.isFailoverReconnect()) {
+               OpenWireConnection oldConnection = context.getConnection();
+               oldConnection.disconnect(true);
+               connections.remove(oldConnection);
+               connection.reconnect(context, info);
             }
             else {
-               throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + oldContext.getConnection().getRemoteAddress());
+               throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + context.getConnection().getRemoteAddress());
             }
          }
          else {
+            //new connection
+            context = connection.initContext(info);
             clientIdSet.put(clientId, context);
          }
-      }
 
-      connections.add(context.getConnection());
+         connections.add(connection);
 
-      ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
-      // do not distribute passwords in advisory messages. usernames okay
-      ConnectionInfo copy = info.copy();
-      copy.setPassword("");
-      fireAdvisory(context, topic, copy);
-      connectionInfos.put(copy.getConnectionId(), copy);
+         ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
+         // do not distribute passwords in advisory messages. usernames okay
+         ConnectionInfo copy = info.copy();
+         copy.setPassword("");
+         fireAdvisory(context, topic, copy);
 
-      // init the conn
-      addSessions(context.getConnection(), context.getConnectionState().getSessionIds());
+         // init the conn
+         context.getConnection().addSessions( context.getConnectionState().getSessionIds());
+      }
    }
 
-   private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command copy) throws Exception {
+   public void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command copy) throws Exception {
       this.fireAdvisory(context, topic, copy, null);
    }
 
    public BrokerId getBrokerId() {
+      // TODO: Use the Storage ID here...
       if (brokerId == null) {
          brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
       }
@@ -347,7 +342,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
    /*
     * See AdvisoryBroker.fireAdvisory()
     */
-   private void fireAdvisory(AMQConnectionContext context,
+   public void fireAdvisory(AMQConnectionContext context,
                              ActiveMQTopic topic,
                              Command command,
                              ConsumerId targetConsumerId) throws Exception {
@@ -372,13 +367,12 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       boolean originalFlowControl = context.isProducerFlowControl();
       final AMQProducerBrokerExchange producerExchange = new AMQProducerBrokerExchange();
       producerExchange.setConnectionContext(context);
-      producerExchange.setMutable(true);
       producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
       try {
          context.setProducerFlowControl(false);
          AMQSession sess = context.getConnection().getAdvisorySession();
          if (sess != null) {
-            sess.send(producerExchange, advisoryMessage, false);
+            sess.send(producerExchange.getProducerState().getInfo(), advisoryMessage, false);
          }
       }
       finally {
@@ -392,220 +386,68 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
             brokerName = InetAddressUtil.getLocalHostName().toLowerCase(Locale.ENGLISH);
          }
          catch (Exception e) {
-            brokerName = "localhost";
+            brokerName = server.getNodeID().toString();
          }
       }
       return brokerName;
    }
 
-   public boolean isFaultTolerantConfiguration() {
-      return false;
-   }
-
-   public void postProcessDispatch(MessageDispatch md) {
-      // TODO Auto-generated method stub
-
-   }
-
-   public boolean isStopped() {
-      // TODO Auto-generated method stub
-      return false;
-   }
-
-   public void preProcessDispatch(MessageDispatch messageDispatch) {
-      // TODO Auto-generated method stub
+   protected ConnectionControl newConnectionControl() {
+      ConnectionControl control = new ConnectionControl();
 
-   }
+      String uri = generateMembersURI(rebalanceClusterClients);
+      control.setConnectedBrokers(uri);
 
-   public boolean isStopping() {
-      return false;
+      control.setRebalanceConnection(rebalanceClusterClients);
+      return control;
    }
 
-   public void addProducer(OpenWireConnection theConn, ProducerInfo info) throws Exception {
-      SessionId sessionId = info.getProducerId().getParentId();
-      ConnectionId connectionId = sessionId.getParentId();
-      ConnectionState cs = theConn.getState();
-      if (cs == null) {
-         throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + connectionId);
-      }
-      SessionState ss = cs.getSessionState(sessionId);
-      if (ss == null) {
-         throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
-      }
-      // Avoid replaying dup commands
-      if (!ss.getProducerIds().contains(info.getProducerId())) {
-
-         AMQSession amqSession = sessions.get(sessionId);
-         if (amqSession == null) {
-            throw new IllegalStateException("Session not exist! : " + sessionId);
-         }
+   private String generateMembersURI(boolean flip) {
+      String uri;
+      StringBuffer connectedBrokers = new StringBuffer();
+      String separator = "";
 
-         ActiveMQDestination destination = info.getDestination();
-         if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
-            if (theConn.getProducerCount() >= theConn.getMaximumProducersAllowedPerConnection()) {
-               throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumProducersAllowedPerConnection());
-            }
-            if (destination.isQueue()) {
-               OpenWireUtil.validateDestination(destination, amqSession);
+      synchronized (members) {
+         if (members.size() > 0) {
+            for (TopologyMember member : members) {
+               connectedBrokers.append(separator).append(member.toURI());
+               separator = ",";
             }
-            DestinationInfo destInfo = new DestinationInfo(theConn.getConext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
-            this.addDestination(theConn, destInfo);
-         }
 
-
-         amqSession.createProducer(info);
-
-         try {
-            ss.addProducer(info);
-         }
-         catch (IllegalStateException e) {
-            amqSession.removeProducer(info);
-         }
-
-      }
-
-   }
-
-   public void addConsumer(OpenWireConnection theConn, ConsumerInfo info) throws Exception {
-      // Todo: add a destination interceptors holder here (amq supports this)
-      SessionId sessionId = info.getConsumerId().getParentId();
-      ConnectionId connectionId = sessionId.getParentId();
-      ConnectionState cs = theConn.getState();
-      if (cs == null) {
-         throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " + connectionId);
-      }
-      SessionState ss = cs.getSessionState(sessionId);
-      if (ss == null) {
-         throw new IllegalStateException(this.server + " Cannot add a consumer to a session that had not been registered: " + sessionId);
-      }
-      // Avoid replaying dup commands
-      if (!ss.getConsumerIds().contains(info.getConsumerId())) {
-         ActiveMQDestination destination = info.getDestination();
-         if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
-            if (theConn.getConsumerCount() >= theConn.getMaximumConsumersAllowedPerConnection()) {
-               throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumConsumersAllowedPerConnection());
+            // The flip exists to guarantee even distribution of URIs when sent to the client
+            // in case of failures you won't get all the connections failing to a single server.
+            if (flip && members.size() > 1) {
+               members.addLast(members.removeFirst());
             }
          }
-
-         AMQSession amqSession = sessions.get(sessionId);
-         if (amqSession == null) {
-            throw new IllegalStateException("Session not exist! : " + sessionId);
-         }
-
-         amqSession.createConsumer(info, amqSession);
-
-         ss.addConsumer(info);
-      }
-   }
-
-   public void addSessions(OpenWireConnection theConn, Set<SessionId> sessionSet) {
-      Iterator<SessionId> iter = sessionSet.iterator();
-      while (iter.hasNext()) {
-         SessionId sid = iter.next();
-         addSession(theConn, theConn.getState().getSessionState(sid).getInfo(), true);
       }
-   }
-
-   public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss) {
-      return addSession(theConn, ss, false);
-   }
 
-   public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss, boolean internal) {
-      AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss, server, theConn, scheduledPool, this);
-      amqSession.initialize();
-      amqSession.setInternal(internal);
-      sessions.put(ss.getSessionId(), amqSession);
-      sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId());
-      return amqSession;
+      uri = connectedBrokers.toString();
+      return uri;
    }
 
-   public void removeConnection(AMQConnectionContext context, ConnectionInfo info, Throwable error) {
-      // todo roll back tx
-      this.connections.remove(context.getConnection());
-      this.connectionInfos.remove(info.getConnectionId());
-      String clientId = info.getClientId();
-      if (clientId != null) {
-         this.clientIdSet.remove(clientId);
-      }
+   public boolean isFaultTolerantConfiguration() {
+      return false;
    }
 
-   public void removeSession(AMQConnectionContext context, SessionInfo info) throws Exception {
-      AMQSession session = sessions.remove(info.getSessionId());
-      if (session != null) {
-         session.close();
-      }
-   }
+   public void postProcessDispatch(MessageDispatch md) {
+      // TODO Auto-generated method stub
 
-   public void removeProducer(ProducerId id) {
-      SessionId sessionId = id.getParentId();
-      AMQSession session = sessions.get(sessionId);
-      session.removeProducer(id);
    }
 
-   public AMQSession getSession(SessionId sessionId) {
-      return sessions.get(sessionId);
+   public boolean isStopped() {
+      // TODO Auto-generated method stub
+      return false;
    }
 
-   public void removeDestination(OpenWireConnection connection, ActiveMQDestination dest) throws Exception {
-      if (dest.isQueue()) {
-         SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName());
-         this.server.destroyQueue(qName);
-      }
-      else {
-         Bindings bindings = this.server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName()));
-         Iterator<Binding> iterator = bindings.getBindings().iterator();
-
-         while (iterator.hasNext()) {
-            Queue b = (Queue) iterator.next().getBindable();
-            if (b.getConsumerCount() > 0) {
-               throw new Exception("Destination still has an active subscription: " + dest.getPhysicalName());
-            }
-            if (b.isDurable()) {
-               throw new Exception("Destination still has durable subscription: " + dest.getPhysicalName());
-            }
-            b.deleteQueue();
-         }
-      }
-
-      if (!AdvisorySupport.isAdvisoryTopic(dest)) {
-         AMQConnectionContext context = connection.getConext();
-         DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.REMOVE_OPERATION_TYPE, dest);
+   public void preProcessDispatch(MessageDispatch messageDispatch) {
+      // TODO Auto-generated method stub
 
-         ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
-         fireAdvisory(context, topic, advInfo);
-      }
    }
 
-   public void addDestination(OpenWireConnection connection, DestinationInfo info) throws Exception {
-      ActiveMQDestination dest = info.getDestination();
-      if (dest.isQueue()) {
-         SimpleString qName = OpenWireUtil.toCoreAddress(dest);
-         QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName);
-         if (binding == null) {
-            if (connection.getState().getInfo() != null) {
-
-               CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
-               server.getSecurityStore().check(qName, checkType, connection);
-
-               server.checkQueueCreationLimit(connection.getUsername());
-            }
-            ConnectionInfo connInfo = connection.getState().getInfo();
-            this.server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary());
-         }
-         if (dest.isTemporary()) {
-            connection.registerTempQueue(dest);
-         }
-      }
-
-      if (!AdvisorySupport.isAdvisoryTopic(dest)) {
-         AMQConnectionContext context = connection.getConext();
-         DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, dest);
-
-         ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
-         fireAdvisory(context, topic, advInfo);
-      }
+   public boolean isStopping() {
+      return false;
    }
-
    public void endTransaction(TransactionInfo info) throws Exception {
       AMQSession txSession = transactions.get(info.getTransactionId());
 
@@ -645,20 +487,15 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       if (txSession != null) {
          txSession.rollback(info);
       }
-      transactions.remove(info.getTransactionId());
-   }
-
-   public TransactionId[] recoverTransactions(Set<SessionId> sIds) {
-      List<TransactionId> recovered = new ArrayList<>();
-      if (sIds != null) {
-         for (SessionId sid : sIds) {
-            AMQSession s = this.sessions.get(sid);
-            if (s != null) {
-               s.recover(recovered);
-            }
-         }
+      else if (info.getTransactionId().isLocalTransaction()) {
+         //during a broker restart, recovered local transaction may not be registered
+         //in that case we ignore and let the tx removed silently by connection.
+         //see AMQ1925Test.testAMQ1925_TXBegin
+      }
+      else {
+         throw newXAException("Transaction '" + info.getTransactionId() + "' has not been started.", XAException.XAER_NOTA);
       }
-      return recovered.toArray(new TransactionId[0]);
+      transactions.remove(info.getTransactionId());
    }
 
    public boolean validateUser(String login, String passcode) {
@@ -681,64 +518,63 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       transactions.remove(xid);
    }
 
+   /**
+    * TODO: remove this, use the regular ResourceManager from the Server's
+    */
    public void registerTx(TransactionId txId, AMQSession amqSession) {
       transactions.put(txId, amqSession);
    }
 
-   //advisory support
-   @Override
-   public void onNotification(Notification notif) {
-      try {
-         if (notif.getType() instanceof CoreNotificationType) {
-            CoreNotificationType type = (CoreNotificationType) notif.getType();
-            switch (type) {
-               case CONSUMER_SLOW:
-                  fireSlowConsumer(notif);
-                  break;
-               default:
-                  break;
-            }
-         }
-      }
-      catch (Exception e) {
-         ActiveMQServerLogger.LOGGER.error("Failed to send notification " + notif, e);
-      }
-   }
-
-   private void fireSlowConsumer(Notification notif) throws Exception {
-      SimpleString coreSessionId = notif.getProperties().getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME);
-      Long coreConsumerId = notif.getProperties().getLongProperty(ManagementHelper.HDR_CONSUMER_NAME);
-      SessionId sessionId = sessionIdMap.get(coreSessionId.toString());
-      AMQSession session = sessions.get(sessionId);
-      AMQConsumer consumer = session.getConsumer(coreConsumerId);
-      ActiveMQDestination destination = consumer.getDestination();
-
-      if (!AdvisorySupport.isAdvisoryTopic(destination)) {
-         ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination);
-         ConnectionId connId = sessionId.getParentId();
-         OpenWireConnection cc = this.brokerConnectionStates.get(connId);
-         ActiveMQMessage advisoryMessage = new ActiveMQMessage();
-         advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, consumer.getId().toString());
-
-         fireAdvisory(cc.getConext(), topic, advisoryMessage, consumer.getId());
-      }
-   }
-
    public void removeSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
       SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
       server.destroyQueue(subQueueName);
    }
 
-   public void sendBrokerInfo(OpenWireConnection connection) {
+   public void sendBrokerInfo(OpenWireConnection connection) throws Exception {
       BrokerInfo brokerInfo = new BrokerInfo();
-      brokerInfo.setBrokerName(server.getIdentity());
-      brokerInfo.setBrokerId(new BrokerId(server.getNodeID().toString()));
+      brokerInfo.setBrokerName(getBrokerName());
+      brokerInfo.setBrokerId(new BrokerId("" + server.getNodeID()));
       brokerInfo.setPeerBrokerInfos(null);
       brokerInfo.setFaultTolerantConfiguration(false);
       brokerInfo.setBrokerURL(connection.getLocalAddress());
 
       //cluster support yet to support
       brokerInfo.setPeerBrokerInfos(null);
-      connection.dispatchAsync(brokerInfo);
+      connection.dispatch(brokerInfo);
+   }
+
+   public void setRebalanceClusterClients(boolean rebalance) {
+      this.rebalanceClusterClients = rebalance;
+   }
+
+   public boolean isRebalanceClusterClients() {
+      return this.rebalanceClusterClients;
+   }
+
+   public void setUpdateClusterClients(boolean updateClusterClients) {
+      this.updateClusterClients = updateClusterClients;
    }
+
+   public boolean isUpdateClusterClients() {
+      return this.updateClusterClients;
+   }
+
+   public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
+      this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
+   }
+
+   public boolean isUpdateClusterClientsOnRemove() {
+      return this.updateClusterClientsOnRemove;
+   }
+
+   public void setBrokerName(String name) {
+      this.brokerName = name;
+   }
+
+   public static XAException newXAException(String s, int errorCode) {
+      XAException xaException = new XAException(s + " " + "xaErrorCode:" + errorCode);
+      xaException.errorCode = errorCode;
+      return xaException;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
index d684761..4513eb3 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
@@ -18,16 +18,12 @@ package org.apache.activemq.artemis.core.protocol.openwire;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
-import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
-import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.artemis.api.core.SimpleString;
 
 public class OpenWireUtil {
 
@@ -64,23 +60,6 @@ public class OpenWireUtil {
       }
    }
 
-   /**
-    * Checks to see if this destination exists.  If it does not throw an invalid destination exception.
-    *
-    * @param destination
-    * @param amqSession
-    */
-   public static void validateDestination(ActiveMQDestination destination, AMQSession amqSession) throws Exception {
-      if (destination.isQueue()) {
-         AMQServerSession coreSession = amqSession.getCoreSession();
-         SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
-         BindingQueryResult result = coreSession.executeBindingQuery(physicalName);
-         if (!result.isExists() && !result.isAutoCreateJmsQueues()) {
-            throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
-         }
-      }
-   }
-
    /*
     *This util converts amq wildcards to compatible core wildcards
     *The conversion is like this:

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java
deleted file mode 100644
index 0e21ca4..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
-import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
-
-public class SendingResult {
-
-   private boolean blockNextSend;
-   private PagingStoreImpl blockPagingStore;
-   private SimpleString blockingAddress;
-
-   public void setBlockNextSend(boolean block) {
-      this.blockNextSend = block;
-   }
-
-   public boolean isBlockNextSend() {
-      return this.blockNextSend;
-   }
-
-   public void setBlockPagingStore(PagingStoreImpl store) {
-      this.blockPagingStore = store;
-   }
-
-   public PagingStoreImpl getBlockPagingStore() {
-      return this.blockPagingStore;
-   }
-
-   public void setBlockingAddress(SimpleString address) {
-      this.blockingAddress = address;
-   }
-
-   public SimpleString getBlockingAddress() {
-      return this.blockingAddress;
-   }
-
-   public boolean isSendFailIfNoSpace() {
-      AddressFullMessagePolicy policy = this.blockPagingStore.getAddressFullMessagePolicy();
-      return policy == AddressFullMessagePolicy.FAIL;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
index 7e83767..56b4b6d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
@@ -20,15 +20,20 @@ import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessagePull;
 
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class AMQCompositeConsumerBrokerExchange extends AMQConsumerBrokerExchange {
 
    private final Map<ActiveMQDestination, AMQConsumer> consumerMap;
 
-   public AMQCompositeConsumerBrokerExchange(AMQSession amqSession, Map<ActiveMQDestination, AMQConsumer> consumerMap) {
+   public AMQCompositeConsumerBrokerExchange(AMQSession amqSession, List<AMQConsumer> consumerList) {
       super(amqSession);
-      this.consumerMap = consumerMap;
+      this.consumerMap = new HashMap<>();
+      for (AMQConsumer consumer : consumerList) {
+         consumerMap.put(consumer.getOpenwireDestination(), consumer);
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java
index a79911c..8071d04 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java
@@ -17,9 +17,11 @@
 package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
+import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.WireFormatInfo;
@@ -47,6 +49,8 @@ public class AMQConnectionContext {
    private boolean clientMaster = true;
    private ConnectionState connectionState;
    private XATransactionId xid;
+   private AtomicInteger refCount = new AtomicInteger(1);
+   private Command lastCommand;
 
    public AMQConnectionContext() {
       this.messageEvaluationContext = new MessageEvaluationContext();
@@ -248,4 +252,19 @@ public class AMQConnectionContext {
       return false;
    }
 
+   public void incRefCount() {
+      refCount.incrementAndGet();
+   }
+
+   public int decRefCount() {
+      return refCount.decrementAndGet();
+   }
+
+   public void setLastCommand(Command lastCommand) {
+      this.lastCommand = lastCommand;
+   }
+
+   public Command getLastCommand() {
+      return this.lastCommand;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 7da1f3e..ef9b2a8 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -27,7 +27,15 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.MessageAck;
@@ -36,23 +44,15 @@ import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
-import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
-
-public class AMQConsumer implements BrowserListener {
 
+public class AMQConsumer {
    private AMQSession session;
-   private org.apache.activemq.command.ActiveMQDestination actualDest;
+   private org.apache.activemq.command.ActiveMQDestination openwireDestination;
    private ConsumerInfo info;
    private final ScheduledExecutorService scheduledPool;
    private long nativeId = -1;
-   private SimpleString subQueueName = null;
 
-   private final int prefetchSize;
+   private int prefetchSize;
    private AtomicInteger windowAvailable;
    private final java.util.Queue<MessageInfo> deliveringRefs = new ConcurrentLinkedQueue<>();
    private long messagePullSequence = 0;
@@ -63,7 +63,7 @@ public class AMQConsumer implements BrowserListener {
                       ConsumerInfo info,
                       ScheduledExecutorService scheduledPool) {
       this.session = amqSession;
-      this.actualDest = d;
+      this.openwireDestination = d;
       this.info = info;
       this.scheduledPool = scheduledPool;
       this.prefetchSize = info.getPrefetchSize();
@@ -73,75 +73,102 @@ public class AMQConsumer implements BrowserListener {
       }
    }
 
-   public void init() throws Exception {
-      AMQServerSession coreSession = session.getCoreSession();
+   public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
+      this.nativeId = nativeId;
+      AMQServerConsumer serverConsumer = createServerConsumer(info, slowConsumerDetectionListener);
+      serverConsumer.setAmqConsumer(this);
+   }
+
+
+   private AMQServerConsumer createServerConsumer(ConsumerInfo info, SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
 
       SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
 
-      nativeId = session.getCoreServer().getStorageManager().generateID();
+      String physicalName = OpenWireUtil.convertWildcard(openwireDestination.getPhysicalName());
+
+      SimpleString address;
+
+      if (openwireDestination.isTopic()) {
+         address = new SimpleString("jms.topic." + physicalName);
 
-      SimpleString address = new SimpleString(this.actualDest.getPhysicalName());
+         SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), physicalName, info.getSubscriptionName(), selector, address);
 
-      if (this.actualDest.isTopic()) {
-         String physicalName = this.actualDest.getPhysicalName();
-         if (physicalName.contains(".>")) {
-            //wildcard
-            physicalName = OpenWireUtil.convertWildcard(physicalName);
+         AMQServerConsumer serverConsumer = (AMQServerConsumer) session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
+         serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
+         return serverConsumer;
+      }
+      else {
+         SimpleString queueName = new SimpleString("jms.queue." + physicalName);
+         AMQServerConsumer serverConsumer = (AMQServerConsumer) session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
+         serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
+         AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString());
+         if (addrSettings != null) {
+            //see PolicyEntry
+            if (info.getPrefetchSize() != 0 && addrSettings.getQueuePrefetch() == 0) {
+               //sends back a ConsumerControl
+               ConsumerControl cc = new ConsumerControl();
+               cc.setConsumerId(info.getConsumerId());
+               cc.setPrefetch(0);
+               session.getConnection().dispatch(cc);
+            }
          }
 
-         // on recreate we don't need to create queues
-         address = new SimpleString("jms.topic." + physicalName);
-         if (info.isDurable()) {
-            subQueueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, info.getClientId(), info.getSubscriptionName()));
-
-            QueueQueryResult result = coreSession.executeQueueQuery(subQueueName);
-            if (result.isExists()) {
-               // Already exists
-               if (result.getConsumerCount() > 0) {
-                  throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
-               }
+         return serverConsumer;
 
-               SimpleString oldFilterString = result.getFilterString();
+      }
 
-               boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || oldFilterString != null && selector != null && !oldFilterString.equals(selector);
+   }
 
-               SimpleString oldTopicName = result.getAddress();
+   private SimpleString createTopicSubscription(boolean isDurable,
+                                                String clientID,
+                                                String physicalName,
+                                                String subscriptionName,
+                                                SimpleString selector,
+                                                SimpleString address) throws Exception {
+
+      SimpleString queueName;
+
+      if (isDurable) {
+         queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName));
+         QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
+         if (result.isExists()) {
+            // Already exists
+            if (result.getConsumerCount() > 0) {
+               throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
+            }
 
-               boolean topicChanged = !oldTopicName.equals(address);
+            SimpleString oldFilterString = result.getFilterString();
 
-               if (selectorChanged || topicChanged) {
-                  // Delete the old durable sub
-                  coreSession.deleteQueue(subQueueName);
+            boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || oldFilterString != null && selector != null && !oldFilterString.equals(selector);
 
-                  // Create the new one
-                  coreSession.createQueue(address, subQueueName, selector, false, true);
-               }
+            SimpleString oldTopicName = result.getAddress();
 
-            }
-            else {
-               coreSession.createQueue(address, subQueueName, selector, false, true);
+            boolean topicChanged = !oldTopicName.equals(address);
+
+            if (selectorChanged || topicChanged) {
+               // Delete the old durable sub
+               session.getCoreSession().deleteQueue(queueName);
+
+               // Create the new one
+               session.getCoreSession().createQueue(address, queueName, selector, false, true);
             }
          }
          else {
-            subQueueName = new SimpleString(UUID.randomUUID().toString());
-
-            coreSession.createQueue(address, subQueueName, selector, true, false);
+            session.getCoreSession().createQueue(address, queueName, selector, false, true);
          }
-
-         coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, -1);
       }
       else {
-         SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName());
-         coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
-      }
+         queueName = new SimpleString(UUID.randomUUID().toString());
+
+         session.getCoreSession().createQueue(address, queueName, selector, true, false);
 
-      if (info.isBrowser()) {
-         AMQServerConsumer coreConsumer = coreSession.getConsumer(nativeId);
-         coreConsumer.setBrowserListener(this);
       }
 
+      return queueName;
    }
 
+
+
    public long getNativeId() {
       return this.nativeId;
    }
@@ -189,7 +216,7 @@ public class AMQConsumer implements BrowserListener {
    public void handleDeliverNullDispatch() {
       MessageDispatch md = new MessageDispatch();
       md.setConsumerId(getId());
-      md.setDestination(actualDest);
+      md.setDestination(openwireDestination);
       session.deliverMessage(md);
       windowAvailable.decrementAndGet();
    }
@@ -210,9 +237,16 @@ public class AMQConsumer implements BrowserListener {
             mi = iter.next();
             if (mi.amqId.equals(lastm)) {
                n++;
-               iter.remove();
-               session.getCoreSession().individualAcknowledge(nativeId, mi.nativeId);
-               session.getCoreSession().commit();
+               if (!isLocalTx) {
+                  iter.remove();
+                  session.getCoreSession().individualAcknowledge(nativeId, mi.nativeId);
+               }
+               else {
+                  mi.setLocalAcked(true);
+               }
+               if (tid == null) {
+                  session.getCoreSession().commit();
+               }
                break;
             }
          }
@@ -220,7 +254,7 @@ public class AMQConsumer implements BrowserListener {
       else if (ack.isRedeliveredAck()) {
          //client tells that this message is for redlivery.
          //do nothing until poisoned.
-         n = 1;
+         n = ack.getMessageCount();
       }
       else if (ack.isPoisonAck()) {
          //send to dlq
@@ -251,7 +285,7 @@ public class AMQConsumer implements BrowserListener {
       }
       else if (ack.isDeliveredAck() || ack.isExpiredAck()) {
          //ToDo: implement with tests
-         n = 1;
+         n = ack.getMessageCount();
       }
       else {
          Iterator<MessageInfo> iter = deliveringRefs.iterator();
@@ -294,7 +328,6 @@ public class AMQConsumer implements BrowserListener {
       acquireCredit(n);
    }
 
-   @Override
    public void browseFinished() {
       MessageDispatch md = new MessageDispatch();
       md.setConsumerId(info.getConsumerId());
@@ -304,11 +337,6 @@ public class AMQConsumer implements BrowserListener {
       session.deliverMessage(md);
    }
 
-   public boolean handledTransactionalMsg() {
-      // TODO Auto-generated method stub
-      return false;
-   }
-
    //this is called before session commit a local tx
    public void finishTx() throws Exception {
       MessageInfo lastMi = null;
@@ -346,10 +374,6 @@ public class AMQConsumer implements BrowserListener {
       }
    }
 
-   public org.apache.activemq.command.ActiveMQDestination getDestination() {
-      return actualDest;
-   }
-
    public ConsumerInfo getInfo() {
       return info;
    }
@@ -370,10 +394,22 @@ public class AMQConsumer implements BrowserListener {
       session.removeConsumer(nativeId);
    }
 
-   public org.apache.activemq.command.ActiveMQDestination getActualDestination() {
-      return actualDest;
+   public org.apache.activemq.command.ActiveMQDestination getOpenwireDestination() {
+      return openwireDestination;
+   }
+
+   public void setPrefetchSize(int prefetchSize) {
+      this.prefetchSize = prefetchSize;
+      this.windowAvailable.set(prefetchSize);
+      this.info.setPrefetchSize(prefetchSize);
+      if (this.prefetchSize > 0) {
+         session.getCoreSession().promptDelivery(nativeId);
+      }
    }
 
+   /**
+    * The MessagePullHandler is used with slow consumer policies.
+    * */
    private class MessagePullHandler {
 
       private long next = -1;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
index 800ee3f..21a45b1 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
@@ -22,41 +22,11 @@ import org.apache.activemq.command.MessagePull;
 public abstract class AMQConsumerBrokerExchange {
 
    protected final AMQSession amqSession;
-   private AMQConnectionContext connectionContext;
-   private boolean wildcard;
 
    public AMQConsumerBrokerExchange(AMQSession amqSession) {
       this.amqSession = amqSession;
    }
 
-   /**
-    * @return the connectionContext
-    */
-   public AMQConnectionContext getConnectionContext() {
-      return this.connectionContext;
-   }
-
-   /**
-    * @param connectionContext the connectionContext to set
-    */
-   public void setConnectionContext(AMQConnectionContext connectionContext) {
-      this.connectionContext = connectionContext;
-   }
-
-   /**
-    * @return the wildcard
-    */
-   public boolean isWildcard() {
-      return this.wildcard;
-   }
-
-   /**
-    * @param wildcard the wildcard to set
-    */
-   public void setWildcard(boolean wildcard) {
-      this.wildcard = wildcard;
-   }
-
    public abstract void acknowledge(MessageAck ack) throws Exception;
 
    public abstract void processMessagePull(MessagePull messagePull) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducer.java
deleted file mode 100644
index 848325e..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
-
-public class AMQProducer {
-
-   private AMQSession amqSession;
-   private ProducerInfo info;
-
-   public AMQProducer(AMQSession amqSession, ProducerInfo info) {
-      this.amqSession = amqSession;
-      this.info = info;
-   }
-
-   public void init() throws Exception {
-      // If the destination is specified check that it exists.
-      if (info.getDestination() != null) {
-         OpenWireUtil.validateDestination(info.getDestination(), amqSession);
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
index f94c119..220c7fc 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
@@ -16,34 +16,16 @@
  */
 package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
 import org.apache.activemq.state.ProducerState;
 
 public class AMQProducerBrokerExchange {
 
    private AMQConnectionContext connectionContext;
    private ProducerState producerState;
-   private boolean mutable = true;
-   private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
-   private boolean auditProducerSequenceIds;
-   private boolean isNetworkProducer;
-   private final FlowControlInfo flowControlInfo = new FlowControlInfo();
 
    public AMQProducerBrokerExchange() {
    }
 
-   public AMQProducerBrokerExchange copy() {
-      AMQProducerBrokerExchange rc = new AMQProducerBrokerExchange();
-      rc.connectionContext = connectionContext.copy();
-      rc.producerState = producerState;
-      rc.mutable = mutable;
-      return rc;
-   }
-
    /**
     * @return the connectionContext
     */
@@ -59,20 +41,6 @@ public class AMQProducerBrokerExchange {
    }
 
    /**
-    * @return the mutable
-    */
-   public boolean isMutable() {
-      return this.mutable;
-   }
-
-   /**
-    * @param mutable the mutable to set
-    */
-   public void setMutable(boolean mutable) {
-      this.mutable = mutable;
-   }
-
-   /**
     * @return the producerState
     */
    public ProducerState getProducerState() {
@@ -86,119 +54,6 @@ public class AMQProducerBrokerExchange {
       this.producerState = producerState;
    }
 
-   /**
-    * Enforce duplicate suppression using info from persistence adapter
-    *
-    * @return false if message should be ignored as a duplicate
-    */
-   public boolean canDispatch(Message messageSend) {
-      boolean canDispatch = true;
-      if (auditProducerSequenceIds && messageSend.isPersistent()) {
-         final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId();
-         if (isNetworkProducer) {
-            // messages are multiplexed on this producer so we need to query the
-            // persistenceAdapter
-            long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
-            if (producerSequenceId <= lastStoredForMessageProducer) {
-               canDispatch = false;
-            }
-         }
-         else if (producerSequenceId <= lastSendSequenceNumber.get()) {
-            canDispatch = false;
-            if (messageSend.isInTransaction()) {
-            }
-            else {
-            }
-         }
-         else {
-            // track current so we can suppress duplicates later in the stream
-            lastSendSequenceNumber.set(producerSequenceId);
-         }
-      }
-      return canDispatch;
-   }
-
-   private long getStoredSequenceIdForMessage(MessageId messageId) {
-      return -1;
-   }
-
    public void setLastStoredSequenceId(long l) {
    }
-
-   public void incrementSend() {
-      flowControlInfo.incrementSend();
-   }
-
-   public void blockingOnFlowControl(boolean blockingOnFlowControl) {
-      flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl);
-   }
-
-   public boolean isBlockedForFlowControl() {
-      return flowControlInfo.isBlockingOnFlowControl();
-   }
-
-   public void resetFlowControl() {
-      flowControlInfo.reset();
-   }
-
-   public long getTotalTimeBlocked() {
-      return flowControlInfo.getTotalTimeBlocked();
-   }
-
-   public int getPercentageBlocked() {
-      double value = flowControlInfo.getSendsBlocked() / flowControlInfo.getTotalSends();
-      return (int) value * 100;
-   }
-
-   public static class FlowControlInfo {
-
-      private AtomicBoolean blockingOnFlowControl = new AtomicBoolean();
-      private AtomicLong totalSends = new AtomicLong();
-      private AtomicLong sendsBlocked = new AtomicLong();
-      private AtomicLong totalTimeBlocked = new AtomicLong();
-
-      public boolean isBlockingOnFlowControl() {
-         return blockingOnFlowControl.get();
-      }
-
-      public void setBlockingOnFlowControl(boolean blockingOnFlowControl) {
-         this.blockingOnFlowControl.set(blockingOnFlowControl);
-         if (blockingOnFlowControl) {
-            incrementSendBlocked();
-         }
-      }
-
-      public long getTotalSends() {
-         return totalSends.get();
-      }
-
-      public void incrementSend() {
-         this.totalSends.incrementAndGet();
-      }
-
-      public long getSendsBlocked() {
-         return sendsBlocked.get();
-      }
-
-      public void incrementSendBlocked() {
-         this.sendsBlocked.incrementAndGet();
-      }
-
-      public long getTotalTimeBlocked() {
-         return totalTimeBlocked.get();
-      }
-
-      public void incrementTimeBlocked(long time) {
-         this.totalTimeBlocked.addAndGet(time);
-      }
-
-      public void reset() {
-         blockingOnFlowControl.set(false);
-         totalSends.set(0);
-         sendsBlocked.set(0);
-         totalTimeBlocked.set(0);
-
-      }
-   }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
index 3e7afa5..2f9d0bc 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
@@ -23,8 +23,6 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
@@ -34,6 +32,18 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 
 public class AMQServerConsumer extends ServerConsumerImpl {
 
+   // TODO-NOW: remove this once unified
+   AMQConsumer amqConsumer;
+
+   public AMQConsumer getAmqConsumer() {
+      return amqConsumer;
+   }
+
+   /** TODO-NOW: remove this once unified */
+   public void setAmqConsumer(AMQConsumer amqConsumer) {
+      this.amqConsumer = amqConsumer;
+   }
+
    public AMQServerConsumer(long consumerID,
                             AMQServerSession serverSession,
                             QueueBinding binding,
@@ -51,81 +61,6 @@ public class AMQServerConsumer extends ServerConsumerImpl {
       super(consumerID, serverSession, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
    }
 
-   public void setBrowserListener(BrowserListener listener) {
-      AMQBrowserDeliverer newBrowserDeliverer = new AMQBrowserDeliverer(this.browserDeliverer);
-      newBrowserDeliverer.listener = listener;
-      this.browserDeliverer = newBrowserDeliverer;
-   }
-
-   private class AMQBrowserDeliverer extends BrowserDeliverer {
-
-      private BrowserListener listener = null;
-
-      public AMQBrowserDeliverer(final BrowserDeliverer other) {
-         super(other.iterator);
-      }
-
-      @Override
-      public synchronized void run() {
-         // if the reference was busy during the previous iteration, handle it now
-         if (current != null) {
-            try {
-               HandleStatus status = handle(current);
-
-               if (status == HandleStatus.BUSY) {
-                  return;
-               }
-
-               if (status == HandleStatus.HANDLED) {
-                  proceedDeliver(current);
-               }
-
-               current = null;
-            }
-            catch (Exception e) {
-               ActiveMQServerLogger.LOGGER.errorBrowserHandlingMessage(e, current);
-               return;
-            }
-         }
-
-         MessageReference ref = null;
-         HandleStatus status;
-
-         while (true) {
-            try {
-               ref = null;
-               synchronized (messageQueue) {
-                  if (!iterator.hasNext()) {
-                     //here we need to send a null for amq browsers
-                     if (listener != null) {
-                        listener.browseFinished();
-                     }
-                     break;
-                  }
-
-                  ref = iterator.next();
-
-                  status = handle(ref);
-               }
-
-               if (status == HandleStatus.HANDLED) {
-                  proceedDeliver(ref);
-               }
-               else if (status == HandleStatus.BUSY) {
-                  // keep a reference on the current message reference
-                  // to handle it next time the browser deliverer is executed
-                  current = ref;
-                  break;
-               }
-            }
-            catch (Exception e) {
-               ActiveMQServerLogger.LOGGER.errorBrowserHandlingMessage(e, ref);
-               break;
-            }
-         }
-      }
-   }
-
    public void amqPutBackToDeliveringList(final List<MessageReference> refs) {
       synchronized (this.deliveringRefs) {
          for (MessageReference ref : refs) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
index 0a3804c..3f0259d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -90,6 +91,12 @@ public class AMQServerSession extends ServerSessionImpl {
 
    @Override
    protected void doClose(final boolean failed) throws Exception {
+      Set<ServerConsumer> consumersClone = new HashSet<>(consumers.values());
+      for (ServerConsumer consumer : consumersClone) {
+         AMQServerConsumer amqConsumer = (AMQServerConsumer)consumer;
+         amqConsumer.setStarted(false);
+      }
+
       synchronized (this) {
          if (tx != null && tx.getXid() == null) {
             ((AMQTransactionImpl) tx).setRollbackForClose();
@@ -143,6 +150,8 @@ public class AMQServerSession extends ServerSessionImpl {
    }
 
    //amq specific behavior
+
+   // TODO: move this to AMQSession
    public void amqRollback(Set<Long> acked) throws Exception {
       if (tx == null) {
          // Might be null if XA
@@ -218,7 +227,9 @@ public class AMQServerSession extends ServerSessionImpl {
                                         final boolean supportLargeMessage,
                                         final Integer credits) throws Exception {
       if (this.internal) {
-         //internal sessions doesn't check security
+         // Clebert TODO: PQP!!!!!!!!!!!!!!!!!!!!
+
+         //internal sessions doesn't check security:: Why??? //// what's the reason for that? Where a link?
 
          Binding binding = postOffice.getBinding(queueName);
 
@@ -309,6 +320,8 @@ public class AMQServerSession extends ServerSessionImpl {
       return queue;
    }
 
+
+   // Clebert TODO: Get rid of these mthods
    @Override
    protected void doSend(final ServerMessage msg, final boolean direct) throws Exception {
       if (!this.internal) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
index 9ce21e3..a6ca4a0 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
@@ -32,6 +32,15 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 
 public class AMQServerSessionFactory implements ServerSessionFactory {
 
+   private static final AMQServerSessionFactory singleInstance = new AMQServerSessionFactory();
+
+   public static AMQServerSessionFactory getInstance() {
+      return singleInstance;
+   }
+
+   private AMQServerSessionFactory() {
+   }
+
    @Override
    public ServerSessionImpl createCoreSession(String name,
                                               String username,


Mime
View raw message