activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [4/4] activemq-artemis git commit: major refactoring on Transactions and AMQ objects
Date Fri, 01 Apr 2016 02:20:20 GMT
major refactoring on Transactions and AMQ objects


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fb445681
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fb445681
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fb445681

Branch: refs/heads/refactor-openwire
Commit: fb4456813672e7e00e29502aa243205ef6cc191c
Parents: 41a55a1
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Thu Mar 31 14:48:56 2016 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Mar 31 22:19:47 2016 -0400

----------------------------------------------------------------------
 .../plug/ProtonSessionIntegrationCallback.java  |   7 +-
 .../protocol/mqtt/MQTTConnectionManager.java    |   6 +-
 .../core/protocol/mqtt/MQTTSessionCallback.java |   7 +-
 .../protocol/openwire/AMQTransactionImpl.java   |  59 --
 .../protocol/openwire/OpenWireConnection.java   | 385 ++++++++++---
 .../openwire/OpenWireMessageConverter.java      |   1 +
 .../openwire/OpenWireProtocolManager.java       | 117 +---
 .../core/protocol/openwire/OpenWireUtil.java    |  73 ---
 .../amq/AMQCompositeConsumerBrokerExchange.java |   9 +-
 .../core/protocol/openwire/amq/AMQConsumer.java | 247 +++-----
 .../openwire/amq/AMQConsumerBrokerExchange.java |   2 +
 .../openwire/amq/AMQServerConsumer.java         | 102 ----
 .../protocol/openwire/amq/AMQServerSession.java | 391 -------------
 .../openwire/amq/AMQServerSessionFactory.java   |  69 ---
 .../core/protocol/openwire/amq/AMQSession.java  | 237 ++------
 .../amq/AMQSingleConsumerBrokerExchange.java    |   8 +-
 .../openwire/amq/AMQTransactionFactory.java     |  32 --
 .../core/protocol/openwire/amq/MessageInfo.java |  47 --
 .../protocol/openwire/util/OpenWireUtil.java    |  83 +++
 .../protocol/stomp/StompProtocolManager.java    |   4 +-
 .../core/protocol/stomp/StompSession.java       |   5 +-
 .../core/paging/cursor/PagedReferenceImpl.java  |  27 +-
 .../core/impl/ActiveMQPacketHandler.java        |   2 +-
 .../protocol/core/impl/CoreSessionCallback.java |   5 +-
 .../artemis/core/server/ActiveMQServer.java     |   1 -
 .../artemis/core/server/MessageReference.java   |  12 +
 .../activemq/artemis/core/server/Queue.java     |   2 +
 .../artemis/core/server/ServerConsumer.java     |  12 +
 .../artemis/core/server/ServerSession.java      |  26 +
 .../core/server/impl/ActiveMQServerImpl.java    |  11 +-
 .../core/server/impl/LastValueQueue.java        |  16 +
 .../core/server/impl/MessageReferenceImpl.java  |  24 +-
 .../artemis/core/server/impl/QueueImpl.java     |  39 +-
 .../core/server/impl/ServerConsumerImpl.java    |  83 ++-
 .../core/server/impl/ServerSessionImpl.java     | 138 ++---
 .../artemis/core/transaction/Transaction.java   |   7 +
 .../core/transaction/TransactionFactory.java    |  26 -
 .../core/transaction/impl/TransactionImpl.java  |  12 +
 .../spi/core/protocol/SessionCallback.java      |  11 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |   4 +
 .../failover/FailoverTransactionTest.java       |  10 +-
 .../integration/client/HangConsumerTest.java    |  10 +-
 .../integration/openwire/BasicSecurityTest.java |   9 +-
 .../integration/openwire/OpenWireUtilTest.java  |   2 +-
 .../openwire/SimpleOpenWireTest.java            | 572 ++++++++++++++++++-
 .../InvestigationOpenwireTest.java              | 246 --------
 .../core/postoffice/impl/BindingsImplTest.java  |  10 +
 .../unit/core/postoffice/impl/FakeQueue.java    |   5 +
 48 files changed, 1434 insertions(+), 1779 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 5d6af2a..f101dc7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -20,6 +20,7 @@ import java.util.concurrent.Executor;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.qpid.proton.amqp.Binary;
@@ -117,7 +118,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
                                                         false, // boolean autoCommitAcks,
                                                         false, // boolean preAcknowledge,
                                                         true, //boolean xa,
-                                                        (String) null, this, null, true);
+                                                        (String) null, this, true);
    }
 
    @Override
@@ -341,7 +342,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
    }
 
    @Override
-   public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+   public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
 
       ProtonPlugSender plugSender = (ProtonPlugSender) consumer.getProtocolContext();
 
@@ -359,7 +360,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
    }
 
    @Override
-   public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
+   public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
       return 0;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index 9d60513..a3b8b78 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -96,7 +96,11 @@ public class MQTTConnectionManager {
       String id = UUIDGenerator.getInstance().generateStringUUID();
       ActiveMQServer server = session.getServer();
 
-      ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null, session.getSessionCallback(), null, // Session factory
+      ServerSession serverSession = server.createSession(id, username, password,
+                                                         ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                                         session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS,
+                                                         MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE,
+                                                         MQTTUtil.SESSION_XA, null, session.getSessionCallback(),
                                                          MQTTUtil.SESSION_AUTO_CREATE_QUEUE);
       return (ServerSessionImpl) serverSession;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 28d86b8..82b1ed6 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -18,6 +18,7 @@
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -41,7 +42,7 @@ public class MQTTSessionCallback implements SessionCallback {
    }
 
    @Override
-   public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+   public int sendMessage(MessageReference referece, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
       try {
          session.getMqttPublishManager().sendMessage(message, consumer, deliveryCount);
       }
@@ -62,8 +63,8 @@ public class MQTTSessionCallback implements SessionCallback {
    }
 
    @Override
-   public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
-      return sendMessage(message, consumer, deliveryCount);
+   public int sendLargeMessage(MessageReference reference, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
+      return sendMessage(reference, message, consumer, deliveryCount);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java
deleted file mode 100644
index bbd7e95..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire;
-
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.impl.RefsOperation;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
-
-import javax.transaction.xa.Xid;
-
-public class AMQTransactionImpl extends TransactionImpl {
-
-   private boolean rollbackForClose = false;
-
-   public AMQTransactionImpl(Xid xid, StorageManager storageManager, int timeoutSeconds) {
-      super(xid, storageManager, timeoutSeconds);
-   }
-
-   @Override
-   public RefsOperation createRefsOperation(Queue queue) {
-      return new AMQrefsOperation(queue, storageManager);
-   }
-
-   public class AMQrefsOperation extends RefsOperation {
-
-      public AMQrefsOperation(Queue queue, StorageManager storageManager) {
-         super(queue, storageManager);
-      }
-
-
-      // This is because the Rollbacks happen through the consumer, not through the server's
-      @Override
-      public void afterRollback(Transaction tx) {
-         if (rollbackForClose) {
-            super.afterRollback(tx);
-         }
-      }
-   }
-
-   public void setRollbackForClose() {
-      this.rollbackForClose = true;
-   }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index e8259c3..f1eb8c6 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -16,15 +16,18 @@
  */
 package org.apache.activemq.artemis.core.protocol.openwire;
 
+import javax.jms.IllegalStateException;
 import javax.jms.InvalidClientIDException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSSecurityException;
+import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -45,22 +48,30 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionConte
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerConsumer;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
+import org.apache.activemq.artemis.core.server.impl.RefsOperation;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -131,28 +142,39 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
    private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>();
 
 
-   private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new HashMap<>();
-   private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new HashMap<>();
+   private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new ConcurrentHashMap<>();
+   private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new ConcurrentHashMap<>();
 
    // Clebert TODO: Artemis already stores the Session. Why do we need a different one here
    private Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>();
 
-
-
    private ConnectionState state;
 
    private final Set<ActiveMQDestination> tempQueues = new ConcurrentHashSet<>();
 
-   private Map<TransactionId, TransactionInfo> txMap = new ConcurrentHashMap<>();
+
+   /** Openwire doesn't sen transactions associated with any sessions.
+    *  It will however send beingTX / endTX as it would be doing it with XA Transactions.
+    *  But always without any association with Sessions.
+    *  This collection will hold nonXA transactions. Hopefully while they are in transit only. */
+   private Map<TransactionId, Transaction> txMap = new ConcurrentHashMap<>();
 
    private volatile AMQSession advisorySession;
 
+   private final ActiveMQServer server;
+
+   /** This is to be used with connection operations that don't have  a session.
+    * Such as TM operations. */
+   private ServerSession internalSession;
+
    // TODO-NOW: check on why there are two connections created for every createConnection on the client.
    public OpenWireConnection(Connection connection,
+                             ActiveMQServer server,
                              Executor executor,
                              OpenWireProtocolManager openWireProtocolManager,
                              OpenWireFormat wf) {
       super(connection, executor);
+      this.server = server;
       this.protocolManager = openWireProtocolManager;
       this.wireFormat = wf;
    }
@@ -227,8 +249,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
                response = command.visit(commandProcessorInstance);
             }
             catch (Exception e) {
+               // TODO: logging
+               e.printStackTrace();
                if (responseRequired) {
-                  response = new ExceptionResponse(e);
+                  response = convertException(e);
                }
             }
             finally {
@@ -276,6 +300,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
    }
 
    public void sendException(Exception e) {
+      Response resp = convertException(e);
+      try {
+         dispatch(resp);
+      }
+      catch (IOException e2) {
+         ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e2);
+      }
+   }
+
+   private Response convertException(Exception e) {
       Response resp;
       if (e instanceof ActiveMQSecurityException) {
          resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
@@ -286,12 +320,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       else {
          resp = new ExceptionResponse(e);
       }
-      try {
-         dispatch(resp);
-      }
-      catch (IOException e2) {
-         ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e2);
-      }
+      return resp;
    }
 
    private void setLastCommand(Command command) {
@@ -471,12 +500,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       return result;
    }
 
-   private void removeConsumerBrokerExchange(ConsumerId id) {
-      synchronized (consumerExchanges) {
-         consumerExchanges.remove(id);
-      }
-   }
-
    public void deliverMessage(MessageDispatch dispatch) {
       Message m = dispatch.getMessage();
       if (m != null) {
@@ -576,7 +599,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       }
    }
 
-   public AMQConnectionContext initContext(ConnectionInfo info) {
+   public AMQConnectionContext initContext(ConnectionInfo info) throws Exception {
       WireFormatInfo wireFormatInfo = wireFormat.getPreferedWireFormatInfo();
       // Older clients should have been defaulting this field to true.. but
       // they were not.
@@ -608,9 +631,15 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
          info.setClientIp(getRemoteAddress());
       }
 
+      createInternalSession(info);
+
       return context;
    }
 
+   private void createInternalSession(ConnectionInfo info) throws Exception {
+      internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true);
+   }
+
    //raise the refCount of context
    public void reconnect(AMQConnectionContext existingContext, ConnectionInfo info) {
       this.context = existingContext;
@@ -663,17 +692,17 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       ActiveMQDestination dest = info.getDestination();
       if (dest.isQueue()) {
          SimpleString qName = OpenWireUtil.toCoreAddress(dest);
-         QueueBinding binding = (QueueBinding) protocolManager.getServer().getPostOffice().getBinding(qName);
+         QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName);
          if (binding == null) {
             if (getState().getInfo() != null) {
 
                CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
-               protocolManager.getServer().getSecurityStore().check(qName, checkType, this);
+               server.getSecurityStore().check(qName, checkType, this);
 
-               protocolManager.getServer().checkQueueCreationLimit(getUsername());
+               server.checkQueueCreationLimit(getUsername());
             }
             ConnectionInfo connInfo = getState().getInfo();
-            protocolManager.getServer().createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary());
+            server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary());
          }
 
          if (dest.isTemporary()) {
@@ -692,9 +721,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
 
    public void updateConsumer(ConsumerControl consumerControl) {
-      SessionId sessionId = consumerControl.getConsumerId().getParentId();
-      AMQSession amqSession = sessions.get(sessionId);
-      amqSession.updateConsumerPrefetchSize(consumerControl.getConsumerId(), consumerControl.getPrefetch());
+      ConsumerId consumerId = consumerControl.getConsumerId();
+      AMQConsumerBrokerExchange exchange = this.consumerExchanges.get(consumerId);
+      if (exchange != null) {
+         exchange.updateConsumerPrefetchSize(consumerControl.getPrefetch());
+      }
    }
 
    public void addConsumer(ConsumerInfo info) throws Exception {
@@ -707,7 +738,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       }
       SessionState ss = cs.getSessionState(sessionId);
       if (ss == null) {
-         throw new IllegalStateException(protocolManager.getServer() + " Cannot add a consumer to a session that had not been registered: " + sessionId);
+         throw new IllegalStateException(server + " Cannot add a consumer to a session that had not been registered: " + sessionId);
       }
       // Avoid replaying dup commands
       if (!ss.getConsumerIds().contains(info.getConsumerId())) {
@@ -729,13 +760,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
       @Override
       public void onSlowConsumer(ServerConsumer consumer) {
-         if (consumer instanceof AMQServerConsumer) {
-            AMQServerConsumer serverConsumer = (AMQServerConsumer)consumer;
-            ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(serverConsumer.getAmqConsumer().getOpenwireDestination());
+         if (consumer.getProtocolData() != null && consumer.getProtocolData() instanceof AMQConsumer) {
+            AMQConsumer amqConsumer = (AMQConsumer)consumer.getProtocolData();
+            ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(amqConsumer.getOpenwireDestination());
             ActiveMQMessage advisoryMessage = new ActiveMQMessage();
             try {
-               advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, serverConsumer.getAmqConsumer().getId().toString());
-               protocolManager.fireAdvisory(context, topic, advisoryMessage, serverConsumer.getAmqConsumer().getId());
+               advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, amqConsumer.getId().toString());
+               protocolManager.fireAdvisory(context, topic, advisoryMessage, amqConsumer.getId());
             }
             catch (Exception e) {
                // TODO-NOW: LOGGING
@@ -758,9 +789,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
    }
 
    public AMQSession addSession(SessionInfo ss, boolean internal) {
-      AMQSession amqSession = new AMQSession(getState().getInfo(), ss, protocolManager.getServer(), this, protocolManager.getScheduledPool(), protocolManager);
+      AMQSession amqSession = new AMQSession(getState().getInfo(), ss, server, this, protocolManager.getScheduledPool());
       amqSession.initialize();
-      amqSession.setInternal(internal);
+
+      if (internal) {
+         amqSession.disableSecurity();
+      }
+
       sessions.put(ss.getSessionId(), amqSession);
       sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId());
       return amqSession;
@@ -780,10 +815,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
    public void removeDestination(ActiveMQDestination dest) throws Exception {
       if (dest.isQueue()) {
          SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName());
-         protocolManager.getServer().destroyQueue(qName);
+         server.destroyQueue(qName);
       }
       else {
-         Bindings bindings = protocolManager.getServer().getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName()));
+         Bindings bindings = server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName()));
          Iterator<Binding> iterator = bindings.getBindings().iterator();
 
          while (iterator.hasNext()) {
@@ -815,7 +850,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
    private void validateDestination(ActiveMQDestination destination) throws Exception {
       if (destination.isQueue()) {
          SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
-         BindingQueryResult result = protocolManager.getServer().bindingQuery(physicalName);
+         BindingQueryResult result = server.bindingQuery(physicalName);
          if (!result.isExists() && !result.isAutoCreateJmsQueues()) {
             throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
          }
@@ -934,18 +969,70 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
       @Override
       public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
-         protocolManager.removeSubscription(subInfo);
+         SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
+         server.destroyQueue(subQueueName);
+
          return null;
       }
 
       @Override
       public Response processRollbackTransaction(TransactionInfo info) throws Exception {
-         protocolManager.rollbackTransaction(info);
-         TransactionId txId = info.getTransactionId();
-         txMap.remove(txId);
+         Transaction tx = lookupTX(info.getTransactionId(), null);
+         if (info.getTransactionId().isXATransaction() && tx == null) {
+            throw newXAException("Transaction '" + info.getTransactionId() + "' has not been started.", XAException.XAER_NOTA);
+         }
+         else if(tx != null) {
+
+            AMQSession amqSession = (AMQSession)tx.getProtocolData();
+
+            if (amqSession != null) {
+               amqSession.getCoreSession().resetTX(tx);
+
+               try {
+                  returnReferences(tx, amqSession);
+               }
+               finally {
+                  amqSession.getCoreSession().resetTX(null);
+               }
+            }
+            tx.rollback();
+         }
+
+
          return null;
       }
 
+      /** Openwire will redeliver rolled back references.
+       *  We need to return those here. */
+      private void returnReferences(Transaction tx, AMQSession session) throws Exception {
+         if (session == null || session.isClosed()) {
+            return;
+         }
+
+         RefsOperation oper = (RefsOperation) tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
+
+         if (oper != null) {
+            List<MessageReference> ackRefs = oper.getReferencesToAcknowledge();
+
+            for (ListIterator<MessageReference> referenceIterator = ackRefs.listIterator(ackRefs.size()); referenceIterator.hasPrevious();){
+               MessageReference ref = referenceIterator.previous();
+
+               Long consumerID = ref.getConsumerId();
+
+               ServerConsumer consumer = null;
+               if (consumerID != null) {
+                  consumer = session.getCoreSession().locateConsumer(consumerID);
+               }
+
+               if (consumer != null) {
+                  System.out.println("Returning reference " + ref.getMessage());
+                  referenceIterator.remove();
+                  consumer.backToDelivering(ref);
+               }
+            }
+         }
+      }
+
       @Override
       public Response processShutdown(ShutdownInfo info) throws Exception {
          OpenWireConnection.this.shutdown(false);
@@ -989,44 +1076,137 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
       @Override
       public Response processBeginTransaction(TransactionInfo info) throws Exception {
-         TransactionId txId = info.getTransactionId();
+         final TransactionId txID = info.getTransactionId();
 
-         if (!txMap.containsKey(txId)) {
-            txMap.put(txId, info);
+         try {
+            internalSession.resetTX(null);
+            if (txID.isXATransaction()) {
+               Xid xid = OpenWireUtil.toXID(txID);
+               internalSession.xaStart(xid);
+            }
+            else {
+               Transaction transaction = internalSession.newTransaction();
+               txMap.put(txID, transaction);
+               transaction.addOperation(new TransactionOperationAbstract() {
+                  @Override
+                  public void afterCommit(Transaction tx) {
+                     txMap.remove(txID);
+                  }
+               });
+            }
+         }
+         finally {
+            internalSession.resetTX(null);
          }
          return null;
       }
 
       @Override
-      public Response processBrokerInfo(BrokerInfo arg0) throws Exception {
-         throw new IllegalStateException("not implemented! ");
+      public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+         return processCommit(info, true);
+      }
+
+      private Response processCommit(TransactionInfo info, boolean onePhase) throws Exception {
+         TransactionId txID = info.getTransactionId();
+
+         Transaction tx = lookupTX(txID, null);
+
+         AMQSession session = (AMQSession)tx.getProtocolData();
+
+         tx.commit(onePhase);
+
+         return null;
       }
 
       @Override
-      public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
-         try {
-            protocolManager.commitTransactionOnePhase(info);
-            TransactionId txId = info.getTransactionId();
-            txMap.remove(txId);
+      public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+         return processCommit(info, false);
+      }
+
+      @Override
+      public Response processForgetTransaction(TransactionInfo info) throws Exception {
+         TransactionId txID = info.getTransactionId();
+
+         if (txID.isXATransaction()) {
+            try {
+               Xid xid = OpenWireUtil.toXID(info.getTransactionId());
+               internalSession.xaForget(xid);
+            }
+            catch (Exception e) {
+               e.printStackTrace();
+               throw e;
+            }
          }
-         catch (Exception e) {
-            e.printStackTrace();
-            throw e;
+         else {
+            txMap.remove(txID);
          }
 
          return null;
       }
 
+
       @Override
-      public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
-         protocolManager.commitTransactionTwoPhase(info);
-         TransactionId txId = info.getTransactionId();
-         txMap.remove(txId);
+      public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+         TransactionId txID = info.getTransactionId();
+
+         try {
+            if (txID.isXATransaction()) {
+               try {
+                  Xid xid = OpenWireUtil.toXID(info.getTransactionId());
+                  internalSession.xaPrepare(xid);
+               }
+               catch (Exception e) {
+                  e.printStackTrace();
+                  throw e;
+               }
+            }
+            else {
+               Transaction tx = lookupTX(txID, null);
+               tx.prepare();
+            }
+         }
+         finally {
+            internalSession.resetTX(null);
+         }
+
+         return new IntegerResponse(XAResource.XA_RDONLY);
+      }
+
+
+      @Override
+      public Response processEndTransaction(TransactionInfo info) throws Exception {
+         TransactionId txID = info.getTransactionId();
+
+         if (txID.isXATransaction()) {
+            try {
+               Transaction tx = lookupTX(txID, null);
+               internalSession.resetTX(tx);
+               try {
+                  Xid xid = OpenWireUtil.toXID(info.getTransactionId());
+                  internalSession.xaEnd(xid);
+               }
+               finally {
+                  internalSession.resetTX(null);
+               }
+            }
+            catch (Exception e) {
+               e.printStackTrace();
+               throw e;
+            }
+         }
+         else {
+            txMap.remove(info);
+         }
 
          return null;
       }
 
       @Override
+      public Response processBrokerInfo(BrokerInfo arg0) throws Exception {
+         throw new IllegalStateException("not implemented! ");
+      }
+
+      @Override
       public Response processConnectionControl(ConnectionControl connectionControl) throws Exception {
          //activemq5 keeps a var to remember only the faultTolerant flag
          //this can be sent over a reconnected transport as the first command
@@ -1058,31 +1238,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       }
 
       @Override
-      public Response processEndTransaction(TransactionInfo info) throws Exception {
-         protocolManager.endTransaction(info);
-         TransactionId txId = info.getTransactionId();
-
-         if (!txMap.containsKey(txId)) {
-            txMap.put(txId, info);
-         }
-         return null;
-      }
-
-      @Override
       public Response processFlush(FlushCommand arg0) throws Exception {
          throw new IllegalStateException("not implemented! ");
       }
 
       @Override
-      public Response processForgetTransaction(TransactionInfo info) throws Exception {
-         TransactionId txId = info.getTransactionId();
-         txMap.remove(txId);
-
-         protocolManager.forgetTransaction(info.getTransactionId());
-         return null;
-      }
-
-      @Override
       public Response processKeepAlive(KeepAliveInfo arg0) throws Exception {
          throw new IllegalStateException("not implemented! ");
       }
@@ -1097,15 +1257,33 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
          AMQSession session = getSession(producerId.getParentId());
 
-         session.send(producerInfo, messageSend, sendProducerAck);
+         Transaction tx = lookupTX(messageSend.getTransactionId(), session);
+
+         session.getCoreSession().resetTX(tx);
+         try {
+            session.send(producerInfo, messageSend, sendProducerAck);
+         }
+         finally {
+            session.getCoreSession().resetTX(null);
+         }
+
          return null;
       }
 
 
       @Override
       public Response processMessageAck(MessageAck ack) throws Exception {
-         AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId());
-         consumerBrokerExchange.acknowledge(ack);
+         AMQSession session = getSession(ack.getConsumerId().getParentId());
+         Transaction tx = lookupTX(ack.getTransactionId(), session);
+         session.getCoreSession().resetTX(tx);
+
+         try {
+            AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId());
+            consumerBrokerExchange.acknowledge(ack);
+         }
+         finally {
+            session.getCoreSession().resetTX(null);
+         }
          return null;
       }
 
@@ -1130,13 +1308,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       }
 
       @Override
-      public Response processPrepareTransaction(TransactionInfo info) throws Exception {
-         protocolManager.prepareTransaction(info);
-         //activemq needs a rdonly response
-         return new IntegerResponse(XAResource.XA_RDONLY);
-      }
-
-      @Override
       public Response processProducerAck(ProducerAck arg0) throws Exception {
          // a broker doesn't do producers.. this shouldn't happen
          return null;
@@ -1186,15 +1357,45 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
          ConsumerInfo info = consumerState.getInfo();
          info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
 
-         AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(id);
+         AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.remove(id);
 
          consumerBrokerExchange.removeConsumer();
 
-         removeConsumerBrokerExchange(id);
+         return null;
+      }
+
+   }
 
+   private Transaction lookupTX(TransactionId txID, AMQSession session) throws IllegalStateException {
+      if (txID == null) {
          return null;
       }
 
+      Xid xid = null;
+      Transaction transaction;
+      if (txID.isXATransaction()) {
+         xid = OpenWireUtil.toXID(txID);
+         transaction = server.getResourceManager().getTransaction(xid);
+      }
+      else {
+         transaction = txMap.get(txID);
+      }
+
+      if (transaction == null) {
+         throw new IllegalStateException("cannot find transactionInfo::" + txID + " xid=" + xid);
+      }
+
+      if (session != null && transaction.getProtocolData() != session) {
+         transaction.setProtocolData(session);
+      }
+
+      return transaction;
+   }
+
+   public static XAException newXAException(String s, int errorCode) {
+      XAException xaException = new XAException(s + " " + "xaErrorCode:" + errorCode);
+      xaException.errorCode = errorCode;
+      return xaException;
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 89f71ed..b0a6d46 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
+import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index bbbb696..c87fbea 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -33,7 +33,6 @@ import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
-import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
 import org.apache.activemq.artemis.api.core.client.TopologyMember;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
@@ -101,10 +100,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
 
    private String brokerName;
 
-   // Clebert: Artemis already has a Resource Manager. Need to remove this..
-   //          The TransactionID extends XATransactionID, so all we need is to convert the XID here
-   private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<>();
-
    private final Map<String, TopologyMember> topologyMap = new ConcurrentHashMap<>();
 
    private final LinkedList<TopologyMember> members = new LinkedList<>();
@@ -140,7 +135,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
    }
 
    public OpenWireFormat getNewWireFormat() {
-      return (OpenWireFormat)wireFactory.createWireFormat();
+      return (OpenWireFormat) wireFactory.createWireFormat();
    }
 
    @Override
@@ -156,9 +151,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
       }
    }
 
-
-   public void removeConnection(ConnectionInfo info,
-                                Throwable error) throws InvalidClientIDException {
+   public void removeConnection(ConnectionInfo info, Throwable error) throws InvalidClientIDException {
       synchronized (clientIdSet) {
          String clientId = info.getClientId();
          if (clientId != null) {
@@ -176,7 +169,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
       }
    }
 
-
    public ScheduledExecutorService getScheduledPool() {
       return scheduledPool;
    }
@@ -223,7 +215,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
       OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
-      OpenWireConnection owConn = new OpenWireConnection(connection, server.getExecutorFactory().getExecutor(), this, wf);
+      OpenWireConnection owConn = new OpenWireConnection(connection, server, server.getExecutorFactory().getExecutor(), this, wf);
       owConn.sendHandshake();
 
       // TODO CLEBERT What is this constant here? we should get it from TTL initial pings
@@ -323,7 +315,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
          fireAdvisory(context, topic, copy);
 
          // init the conn
-         context.getConnection().addSessions( context.getConnectionState().getSessionIds());
+         context.getConnection().addSessions(context.getConnectionState().getSessionIds());
       }
    }
 
@@ -343,9 +335,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
     * See AdvisoryBroker.fireAdvisory()
     */
    public void fireAdvisory(AMQConnectionContext context,
-                             ActiveMQTopic topic,
-                             Command command,
-                             ConsumerId targetConsumerId) throws Exception {
+                            ActiveMQTopic topic,
+                            Command command,
+                            ConsumerId targetConsumerId) throws Exception {
       ActiveMQMessage advisoryMessage = new ActiveMQMessage();
       advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
       String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
@@ -448,55 +440,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
    public boolean isStopping() {
       return false;
    }
-   public void endTransaction(TransactionInfo info) throws Exception {
-      AMQSession txSession = transactions.get(info.getTransactionId());
-
-      if (txSession != null) {
-         txSession.endTransaction(info);
-      }
-   }
-
-   public void commitTransactionOnePhase(TransactionInfo info) throws Exception {
-      AMQSession txSession = transactions.get(info.getTransactionId());
-
-      if (txSession != null) {
-         txSession.commitOnePhase(info);
-      }
-      transactions.remove(info.getTransactionId());
-   }
-
-   public void prepareTransaction(TransactionInfo info) throws Exception {
-      XATransactionId xid = (XATransactionId) info.getTransactionId();
-      AMQSession txSession = transactions.get(xid);
-      if (txSession != null) {
-         txSession.prepareTransaction(xid);
-      }
-   }
-
-   public void commitTransactionTwoPhase(TransactionInfo info) throws Exception {
-      XATransactionId xid = (XATransactionId) info.getTransactionId();
-      AMQSession txSession = transactions.get(xid);
-      if (txSession != null) {
-         txSession.commitTwoPhase(xid);
-      }
-      transactions.remove(xid);
-   }
-
-   public void rollbackTransaction(TransactionInfo info) throws Exception {
-      AMQSession txSession = transactions.get(info.getTransactionId());
-      if (txSession != null) {
-         txSession.rollback(info);
-      }
-      else if (info.getTransactionId().isLocalTransaction()) {
-         //during a broker restart, recovered local transaction may not be registered
-         //in that case we ignore and let the tx removed silently by connection.
-         //see AMQ1925Test.testAMQ1925_TXBegin
-      }
-      else {
-         throw newXAException("Transaction '" + info.getTransactionId() + "' has not been started.", XAException.XAER_NOTA);
-      }
-      transactions.remove(info.getTransactionId());
-   }
 
    public boolean validateUser(String login, String passcode) {
       boolean validated = true;
@@ -510,26 +453,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
       return validated;
    }
 
-   public void forgetTransaction(TransactionId xid) throws Exception {
-      AMQSession txSession = transactions.get(xid);
-      if (txSession != null) {
-         txSession.forget(xid);
-      }
-      transactions.remove(xid);
-   }
-
-   /**
-    * TODO: remove this, use the regular ResourceManager from the Server's
-    */
-   public void registerTx(TransactionId txId, AMQSession amqSession) {
-      transactions.put(txId, amqSession);
-   }
-
-   public void removeSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
-      SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
-      server.destroyQueue(subQueueName);
-   }
-
    public void sendBrokerInfo(OpenWireConnection connection) throws Exception {
       BrokerInfo brokerInfo = new BrokerInfo();
       brokerInfo.setBrokerName(getBrokerName());
@@ -543,14 +466,26 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
       connection.dispatch(brokerInfo);
    }
 
+   /**
+    * URI property
+    */
+   @SuppressWarnings("unused")
    public void setRebalanceClusterClients(boolean rebalance) {
       this.rebalanceClusterClients = rebalance;
    }
 
+   /**
+    * URI property
+    */
+   @SuppressWarnings("unused")
    public boolean isRebalanceClusterClients() {
       return this.rebalanceClusterClients;
    }
 
+   /**
+    * URI property
+    */
+   @SuppressWarnings("unused")
    public void setUpdateClusterClients(boolean updateClusterClients) {
       this.updateClusterClients = updateClusterClients;
    }
@@ -559,10 +494,18 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
       return this.updateClusterClients;
    }
 
+   /**
+    * URI property
+    */
+   @SuppressWarnings("unused")
    public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
       this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
    }
 
+   /**
+    * URI property
+    */
+   @SuppressWarnings("unused")
    public boolean isUpdateClusterClientsOnRemove() {
       return this.updateClusterClientsOnRemove;
    }
@@ -571,10 +514,4 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
       this.brokerName = name;
    }
 
-   public static XAException newXAException(String s, int errorCode) {
-      XAException xaException = new XAException(s + " " + "xaErrorCode:" + errorCode);
-      xaException.errorCode = errorCode;
-      return xaException;
-   }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
deleted file mode 100644
index 4513eb3..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.ByteSequence;
-
-public class OpenWireUtil {
-
-   public static ActiveMQBuffer toActiveMQBuffer(ByteSequence bytes) {
-      ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bytes.length);
-
-      buffer.writeBytes(bytes.data, bytes.offset, bytes.length);
-      return buffer;
-   }
-
-   public static SimpleString toCoreAddress(ActiveMQDestination dest) {
-      if (dest.isQueue()) {
-         return new SimpleString("jms.queue." + dest.getPhysicalName());
-      }
-      else {
-         return new SimpleString("jms.topic." + dest.getPhysicalName());
-      }
-   }
-
-   /**
-    * We convert the core address to an ActiveMQ Destination. We use the actual address on the message rather than the
-    * destination set on the consumer because it maybe different and the JMS spec says that it should be what ever was
-    * set on publish/send so a divert or wildcard may mean thats its different to the destination subscribed to by the
-    * consumer
-    */
-   public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) {
-      String address = message.getAddress().toString();
-      String strippedAddress = address.replace("jms.queue.", "").replace("jms.topic.", "");
-      if (actualDestination.isQueue()) {
-         return new ActiveMQQueue(strippedAddress);
-      }
-      else {
-         return new ActiveMQTopic(strippedAddress);
-      }
-   }
-
-   /*
-    *This util converts amq wildcards to compatible core wildcards
-    *The conversion is like this:
-    *AMQ * wildcard --> Core * wildcard (no conversion)
-    *AMQ > wildcard --> Core # wildcard
-    */
-   public static String convertWildcard(String physicalName) {
-      return physicalName.replaceAll("(\\.>)+", ".#");
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
index 56b4b6d..5b9d72e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
@@ -48,7 +48,7 @@ public class AMQCompositeConsumerBrokerExchange extends AMQConsumerBrokerExchang
    public void acknowledge(MessageAck ack) throws Exception {
       AMQConsumer amqConsumer = consumerMap.get(ack.getDestination());
       if (amqConsumer != null) {
-         amqSession.acknowledge(ack, amqConsumer);
+         amqConsumer.acknowledge(ack);
       }
    }
 
@@ -58,4 +58,11 @@ public class AMQCompositeConsumerBrokerExchange extends AMQConsumerBrokerExchang
          amqConsumer.removeConsumer();
       }
    }
+
+   @Override
+   public void updateConsumerPrefetchSize(int prefetch) {
+      for (AMQConsumer amqConsumer : consumerMap.values()) {
+         amqConsumer.setPrefetchSize(prefetch);
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index ef9b2a8..e65dbb8 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -17,10 +17,8 @@
 package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.Set;
+import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -30,11 +28,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
+import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
@@ -42,7 +43,6 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.wireformat.WireFormat;
 
 public class AMQConsumer {
@@ -50,11 +50,10 @@ public class AMQConsumer {
    private org.apache.activemq.command.ActiveMQDestination openwireDestination;
    private ConsumerInfo info;
    private final ScheduledExecutorService scheduledPool;
-   private long nativeId = -1;
+   private ServerConsumer serverConsumer;
 
    private int prefetchSize;
-   private AtomicInteger windowAvailable;
-   private final java.util.Queue<MessageInfo> deliveringRefs = new ConcurrentLinkedQueue<>();
+   private AtomicInteger currentWindow;
    private long messagePullSequence = 0;
    private MessagePullHandler messagePullHandler;
 
@@ -67,20 +66,13 @@ public class AMQConsumer {
       this.info = info;
       this.scheduledPool = scheduledPool;
       this.prefetchSize = info.getPrefetchSize();
-      this.windowAvailable = new AtomicInteger(prefetchSize);
+      this.currentWindow = new AtomicInteger(prefetchSize);
       if (prefetchSize == 0) {
          messagePullHandler = new MessagePullHandler();
       }
    }
 
    public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
-      this.nativeId = nativeId;
-      AMQServerConsumer serverConsumer = createServerConsumer(info, slowConsumerDetectionListener);
-      serverConsumer.setAmqConsumer(this);
-   }
-
-
-   private AMQServerConsumer createServerConsumer(ConsumerInfo info, SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
 
       SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
 
@@ -93,13 +85,13 @@ public class AMQConsumer {
 
          SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), physicalName, info.getSubscriptionName(), selector, address);
 
-         AMQServerConsumer serverConsumer = (AMQServerConsumer) session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
+         serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
          serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
-         return serverConsumer;
       }
       else {
-         SimpleString queueName = new SimpleString("jms.queue." + physicalName);
-         AMQServerConsumer serverConsumer = (AMQServerConsumer) session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
+         SimpleString queueName = OpenWireUtil.toCoreAddress(openwireDestination);
+         session.getCoreServer().getJMSQueueCreator().create(queueName);
+         serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
          serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
          AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString());
          if (addrSettings != null) {
@@ -113,10 +105,10 @@ public class AMQConsumer {
             }
          }
 
-         return serverConsumer;
-
       }
 
+      serverConsumer.setProtocolData(this);
+
    }
 
    private SimpleString createTopicSubscription(boolean isDurable,
@@ -167,12 +159,6 @@ public class AMQConsumer {
       return queueName;
    }
 
-
-
-   public long getNativeId() {
-      return this.nativeId;
-   }
-
    public ConsumerId getId() {
       return info.getConsumerId();
    }
@@ -182,16 +168,17 @@ public class AMQConsumer {
    }
 
    public void acquireCredit(int n) throws Exception {
-      boolean promptDelivery = windowAvailable.get() == 0;
-      if (windowAvailable.get() < prefetchSize) {
-         this.windowAvailable.addAndGet(n);
-      }
+      int oldwindow = currentWindow.getAndAdd(n);
+
+      boolean promptDelivery = oldwindow < prefetchSize;
+
       if (promptDelivery) {
-         session.getCoreSession().promptDelivery(nativeId);
+         serverConsumer.promptDelivery();
       }
+
    }
 
-   public int handleDeliver(ServerMessage message, int deliveryCount) {
+   public int handleDeliver(MessageReference reference, ServerMessage message, int deliveryCount) {
       MessageDispatch dispatch;
       try {
          if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) {
@@ -200,9 +187,9 @@ public class AMQConsumer {
          //decrement deliveryCount as AMQ client tends to add 1.
          dispatch = OpenWireMessageConverter.createMessageDispatch(message, deliveryCount - 1, this);
          int size = dispatch.getMessage().getSize();
-         this.deliveringRefs.add(new MessageInfo(dispatch.getMessage().getMessageId(), message.getMessageID(), size));
+         reference.setProtocolData(dispatch.getMessage().getMessageId());
          session.deliverMessage(dispatch);
-         windowAvailable.decrementAndGet();
+         currentWindow.decrementAndGet();
          return size;
       }
       catch (IOException e) {
@@ -218,114 +205,59 @@ public class AMQConsumer {
       md.setConsumerId(getId());
       md.setDestination(openwireDestination);
       session.deliverMessage(md);
-      windowAvailable.decrementAndGet();
    }
 
+   /** The acknowledgement in openwire is done based on intervals.
+    *  We will iterate through the list of delivering messages at {@link ServerConsumer#getDeliveringReferencesBasedOnProtocol(boolean, Object, Object)}
+    *  and add those to the Transaction.
+    *  Notice that we will start a new transaction on the cases where there is no transaction. */
    public void acknowledge(MessageAck ack) throws Exception {
+
+
       MessageId first = ack.getFirstMessageId();
-      MessageId lastm = ack.getLastMessageId();
-      TransactionId tid = ack.getTransactionId();
-      boolean isLocalTx = (tid != null) && tid.isLocalTransaction();
-      boolean single = lastm.equals(first);
-
-      MessageInfo mi = null;
-      int n = 0;
-
-      if (ack.isIndividualAck()) {
-         Iterator<MessageInfo> iter = deliveringRefs.iterator();
-         while (iter.hasNext()) {
-            mi = iter.next();
-            if (mi.amqId.equals(lastm)) {
-               n++;
-               if (!isLocalTx) {
-                  iter.remove();
-                  session.getCoreSession().individualAcknowledge(nativeId, mi.nativeId);
-               }
-               else {
-                  mi.setLocalAcked(true);
-               }
-               if (tid == null) {
-                  session.getCoreSession().commit();
-               }
-               break;
-            }
-         }
+      MessageId last = ack.getLastMessageId();
+
+      if (first == null) {
+         first = last;
       }
-      else if (ack.isRedeliveredAck()) {
-         //client tells that this message is for redlivery.
-         //do nothing until poisoned.
-         n = ack.getMessageCount();
+
+      boolean removeReferences = !serverConsumer.isBrowseOnly(); // if it's browse only, nothing to be acked, we just remove the lists
+
+      if (ack.isRedeliveredAck() || ack.isDeliveredAck() || ack.isExpiredAck()) {
+         removeReferences = false;
       }
-      else if (ack.isPoisonAck()) {
-         //send to dlq
-         Iterator<MessageInfo> iter = deliveringRefs.iterator();
-         boolean firstFound = false;
-         while (iter.hasNext()) {
-            mi = iter.next();
-            if (mi.amqId.equals(first)) {
-               n++;
-               iter.remove();
-               session.getCoreSession().moveToDeadLetterAddress(nativeId, mi.nativeId, ack.getPoisonCause());
-               session.getCoreSession().commit();
-               if (single) {
-                  break;
-               }
-               firstFound = true;
-            }
-            else if (firstFound || first == null) {
-               n++;
-               iter.remove();
-               session.getCoreSession().moveToDeadLetterAddress(nativeId, mi.nativeId, ack.getPoisonCause());
-               session.getCoreSession().commit();
-               if (mi.amqId.equals(lastm)) {
-                  break;
-               }
-            }
+
+      List<MessageReference> ackList = serverConsumer.getDeliveringReferencesBasedOnProtocol(removeReferences, first, last);
+
+      acquireCredit(ack.getMessageCount());
+
+      if (removeReferences) {
+
+         Transaction originalTX = session.getCoreSession().getCurrentTransaction();
+         Transaction transaction;
+
+         if (originalTX == null) {
+            transaction = session.getCoreSession().newTransaction();
          }
-      }
-      else if (ack.isDeliveredAck() || ack.isExpiredAck()) {
-         //ToDo: implement with tests
-         n = ack.getMessageCount();
-      }
-      else {
-         Iterator<MessageInfo> iter = deliveringRefs.iterator();
-         boolean firstFound = false;
-         while (iter.hasNext()) {
-            MessageInfo ami = iter.next();
-            if (ami.amqId.equals(first)) {
-               n++;
-               if (!isLocalTx) {
-                  iter.remove();
-               }
-               else {
-                  ami.setLocalAcked(true);
-               }
-               if (single) {
-                  mi = ami;
-                  break;
-               }
-               firstFound = true;
+         else {
+            transaction = originalTX;
+         }
+
+         if (ack.isIndividualAck() || ack.isStandardAck()) {
+            for (MessageReference ref : ackList) {
+               ref.acknowledge(transaction);
             }
-            else if (firstFound || first == null) {
-               n++;
-               if (!isLocalTx) {
-                  iter.remove();
-               }
-               else {
-                  ami.setLocalAcked(true);
-               }
-               if (ami.amqId.equals(lastm)) {
-                  mi = ami;
-                  break;
-               }
+         }
+         else if (ack.isPoisonAck()) {
+            for (MessageReference ref : ackList) {
+               ref.getQueue().sendToDeadLetterAddress(transaction, ref);
             }
          }
-         if (mi != null && !isLocalTx) {
-            session.getCoreSession().acknowledge(nativeId, mi.nativeId);
+
+         if (originalTX == null) {
+            transaction.commit(true);
          }
       }
-
-      acquireCredit(n);
    }
 
    public void browseFinished() {
@@ -337,61 +269,23 @@ public class AMQConsumer {
       session.deliverMessage(md);
    }
 
-   //this is called before session commit a local tx
-   public void finishTx() throws Exception {
-      MessageInfo lastMi = null;
-
-      MessageInfo mi = null;
-      Iterator<MessageInfo> iter = deliveringRefs.iterator();
-      while (iter.hasNext()) {
-         mi = iter.next();
-         if (mi.isLocalAcked()) {
-            iter.remove();
-            lastMi = mi;
-         }
-      }
-
-      if (lastMi != null) {
-         session.getCoreSession().acknowledge(nativeId, lastMi.nativeId);
-      }
-   }
-
-   public void rollbackTx(Set<Long> acked) throws Exception {
-      MessageInfo lastMi = null;
-
-      MessageInfo mi = null;
-      Iterator<MessageInfo> iter = deliveringRefs.iterator();
-      while (iter.hasNext()) {
-         mi = iter.next();
-         if (mi.isLocalAcked()) {
-            acked.add(mi.nativeId);
-            lastMi = mi;
-         }
-      }
-
-      if (lastMi != null) {
-         session.getCoreSession().acknowledge(nativeId, lastMi.nativeId);
-      }
-   }
-
    public ConsumerInfo getInfo() {
       return info;
    }
 
    public boolean hasCredits() {
-      return windowAvailable.get() > 0;
+      return currentWindow.get() > 0;
    }
 
    public void processMessagePull(MessagePull messagePull) throws Exception {
-      windowAvailable.incrementAndGet();
-
+      //      windowAvailable.incrementAndGet();
       if (messagePullHandler != null) {
          messagePullHandler.nextSequence(messagePullSequence++, messagePull.getTimeout());
       }
    }
 
    public void removeConsumer() throws Exception {
-      session.removeConsumer(nativeId);
+      serverConsumer.close(false);
    }
 
    public org.apache.activemq.command.ActiveMQDestination getOpenwireDestination() {
@@ -400,10 +294,10 @@ public class AMQConsumer {
 
    public void setPrefetchSize(int prefetchSize) {
       this.prefetchSize = prefetchSize;
-      this.windowAvailable.set(prefetchSize);
+      this.currentWindow.set(prefetchSize);
       this.info.setPrefetchSize(prefetchSize);
       if (this.prefetchSize > 0) {
-         session.getCoreSession().promptDelivery(nativeId);
+         serverConsumer.promptDelivery();
       }
    }
 
@@ -421,7 +315,7 @@ public class AMQConsumer {
          this.next = next;
          this.timeout = timeout;
          latch = new CountDownLatch(1);
-         session.getCoreSession().forceConsumerDelivery(nativeId, messagePullSequence);
+         serverConsumer.forceDelivery(messagePullSequence);
          //if we are 0 timeout or less we need to wait to get either the forced message or a real message.
          if (timeout <= 0) {
             latch.await(10, TimeUnit.SECONDS);
@@ -434,7 +328,6 @@ public class AMQConsumer {
 
       public boolean checkForcedConsumer(ServerMessage message) {
          if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
-            System.out.println("MessagePullHandler.checkForcedConsumer");
             if (next >= 0) {
                if (timeout <= 0) {
                   latch.countDown();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
index 21a45b1..0132465 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
@@ -32,4 +32,6 @@ public abstract class AMQConsumerBrokerExchange {
    public abstract void processMessagePull(MessagePull messagePull) throws Exception;
 
    public abstract void removeConsumer() throws Exception;
+
+   public abstract void updateConsumerPrefetchSize(int prefetch);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
deleted file mode 100644
index 2f9d0bc..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import java.util.List;
-
-import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.postoffice.QueueBinding;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.QueueImpl;
-import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
-import org.apache.activemq.artemis.core.server.management.ManagementService;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-
-public class AMQServerConsumer extends ServerConsumerImpl {
-
-   // TODO-NOW: remove this once unified
-   AMQConsumer amqConsumer;
-
-   public AMQConsumer getAmqConsumer() {
-      return amqConsumer;
-   }
-
-   /** TODO-NOW: remove this once unified */
-   public void setAmqConsumer(AMQConsumer amqConsumer) {
-      this.amqConsumer = amqConsumer;
-   }
-
-   public AMQServerConsumer(long consumerID,
-                            AMQServerSession serverSession,
-                            QueueBinding binding,
-                            Filter filter,
-                            boolean started,
-                            boolean browseOnly,
-                            StorageManager storageManager,
-                            SessionCallback callback,
-                            boolean preAcknowledge,
-                            boolean strictUpdateDeliveryCount,
-                            ManagementService managementService,
-                            boolean supportLargeMessage,
-                            Integer credits,
-                            final ActiveMQServer server) throws Exception {
-      super(consumerID, serverSession, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
-   }
-
-   public void amqPutBackToDeliveringList(final List<MessageReference> refs) {
-      synchronized (this.deliveringRefs) {
-         for (MessageReference ref : refs) {
-            ref.incrementDeliveryCount();
-            deliveringRefs.add(ref);
-         }
-         //adjust the order. Suppose deliveringRefs has 2 existing
-         //refs m1, m2, and refs has 3 m3, m4, m5
-         //new order must be m3, m4, m5, m1, m2
-         if (refs.size() > 0) {
-            long first = refs.get(0).getMessage().getMessageID();
-            MessageReference m = deliveringRefs.peek();
-            while (m.getMessage().getMessageID() != first) {
-               deliveringRefs.poll();
-               deliveringRefs.add(m);
-               m = deliveringRefs.peek();
-            }
-         }
-      }
-   }
-
-   public void moveToDeadLetterAddress(long mid, Throwable cause) throws Exception {
-      MessageReference ref = removeReferenceByID(mid);
-
-      if (ref == null) {
-         throw new IllegalStateException("Cannot find ref to ack " + mid);
-      }
-
-      ServerMessage coreMsg = ref.getMessage();
-      coreMsg.putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, cause.toString());
-
-      QueueImpl queue = (QueueImpl) ref.getQueue();
-      synchronized (queue) {
-         queue.sendToDeadLetterAddress(ref);
-         queue.decDelivering();
-      }
-   }
-
-}


Mime
View raw message