activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [41/42] activemq-artemis git commit: ARTEMIS-463 More simplifications on the openwire head https://issues.apache.org/jira/browse/ARTEMIS-463
Date Mon, 04 Apr 2016 16:09:50 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
deleted file mode 100644
index 2f9d0bc..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import java.util.List;
-
-import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.postoffice.QueueBinding;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.QueueImpl;
-import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
-import org.apache.activemq.artemis.core.server.management.ManagementService;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-
-public class AMQServerConsumer extends ServerConsumerImpl {
-
-   // TODO-NOW: remove this once unified
-   AMQConsumer amqConsumer;
-
-   public AMQConsumer getAmqConsumer() {
-      return amqConsumer;
-   }
-
-   /** TODO-NOW: remove this once unified */
-   public void setAmqConsumer(AMQConsumer amqConsumer) {
-      this.amqConsumer = amqConsumer;
-   }
-
-   public AMQServerConsumer(long consumerID,
-                            AMQServerSession serverSession,
-                            QueueBinding binding,
-                            Filter filter,
-                            boolean started,
-                            boolean browseOnly,
-                            StorageManager storageManager,
-                            SessionCallback callback,
-                            boolean preAcknowledge,
-                            boolean strictUpdateDeliveryCount,
-                            ManagementService managementService,
-                            boolean supportLargeMessage,
-                            Integer credits,
-                            final ActiveMQServer server) throws Exception {
-      super(consumerID, serverSession, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
-   }
-
-   public void amqPutBackToDeliveringList(final List<MessageReference> refs) {
-      synchronized (this.deliveringRefs) {
-         for (MessageReference ref : refs) {
-            ref.incrementDeliveryCount();
-            deliveringRefs.add(ref);
-         }
-         //adjust the order. Suppose deliveringRefs has 2 existing
-         //refs m1, m2, and refs has 3 m3, m4, m5
-         //new order must be m3, m4, m5, m1, m2
-         if (refs.size() > 0) {
-            long first = refs.get(0).getMessage().getMessageID();
-            MessageReference m = deliveringRefs.peek();
-            while (m.getMessage().getMessageID() != first) {
-               deliveringRefs.poll();
-               deliveringRefs.add(m);
-               m = deliveringRefs.peek();
-            }
-         }
-      }
-   }
-
-   public void moveToDeadLetterAddress(long mid, Throwable cause) throws Exception {
-      MessageReference ref = removeReferenceByID(mid);
-
-      if (ref == null) {
-         throw new IllegalStateException("Cannot find ref to ack " + mid);
-      }
-
-      ServerMessage coreMsg = ref.getMessage();
-      coreMsg.putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, cause.toString());
-
-      QueueImpl queue = (QueueImpl) ref.getQueue();
-      synchronized (queue) {
-         queue.sendToDeadLetterAddress(ref);
-         queue.decDelivering();
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
deleted file mode 100644
index 3f0259d..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.activemq.artemis.api.core.Pair;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
-import org.apache.activemq.artemis.api.core.management.ManagementHelper;
-import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
-import org.apache.activemq.artemis.core.persistence.OperationContext;
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.core.postoffice.BindingType;
-import org.apache.activemq.artemis.core.postoffice.PostOffice;
-import org.apache.activemq.artemis.core.postoffice.QueueBinding;
-import org.apache.activemq.artemis.core.protocol.openwire.AMQTransactionImpl;
-import org.apache.activemq.artemis.core.security.SecurityStore;
-import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.QueueCreator;
-import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
-import org.apache.activemq.artemis.core.server.impl.RefsOperation;
-import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
-import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
-import org.apache.activemq.artemis.core.server.management.ManagementService;
-import org.apache.activemq.artemis.core.server.management.Notification;
-import org.apache.activemq.artemis.core.transaction.ResourceManager;
-import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-import org.apache.activemq.artemis.utils.TypedProperties;
-import org.apache.activemq.artemis.utils.UUID;
-
-public class AMQServerSession extends ServerSessionImpl {
-
-   private boolean internal;
-
-   public AMQServerSession(String name,
-                           String username,
-                           String password,
-                           int minLargeMessageSize,
-                           boolean autoCommitSends,
-                           boolean autoCommitAcks,
-                           boolean preAcknowledge,
-                           boolean persistDeliveryCountBeforeDelivery,
-                           boolean xa,
-                           RemotingConnection connection,
-                           StorageManager storageManager,
-                           PostOffice postOffice,
-                           ResourceManager resourceManager,
-                           SecurityStore securityStore,
-                           ManagementService managementService,
-                           ActiveMQServerImpl activeMQServerImpl,
-                           SimpleString managementAddress,
-                           SimpleString simpleString,
-                           SessionCallback callback,
-                           QueueCreator queueCreator,
-                           OperationContext context) throws Exception {
-      super(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, persistDeliveryCountBeforeDelivery, xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, activeMQServerImpl, managementAddress, simpleString, callback, context, new AMQTransactionFactory(), queueCreator);
-   }
-
-   @Override
-   protected void doClose(final boolean failed) throws Exception {
-      Set<ServerConsumer> consumersClone = new HashSet<>(consumers.values());
-      for (ServerConsumer consumer : consumersClone) {
-         AMQServerConsumer amqConsumer = (AMQServerConsumer)consumer;
-         amqConsumer.setStarted(false);
-      }
-
-      synchronized (this) {
-         if (tx != null && tx.getXid() == null) {
-            ((AMQTransactionImpl) tx).setRollbackForClose();
-         }
-      }
-      super.doClose(failed);
-   }
-
-   public AtomicInteger getConsumerCredits(final long consumerID) {
-      ServerConsumer consumer = consumers.get(consumerID);
-
-      if (consumer == null) {
-         ActiveMQServerLogger.LOGGER.debug("There is no consumer with id " + consumerID);
-
-         return null;
-      }
-
-      return ((ServerConsumerImpl) consumer).getAvailableCredits();
-   }
-
-   public void enableXA() throws Exception {
-      if (!this.xa) {
-         if (this.tx != null) {
-            //that's not expected, maybe a warning.
-            this.tx.rollback();
-            this.tx = null;
-         }
-
-         this.autoCommitAcks = false;
-         this.autoCommitSends = false;
-
-         this.xa = true;
-      }
-   }
-
-   public void enableTx() throws Exception {
-      if (this.xa) {
-         throw new IllegalStateException("Session is XA");
-      }
-
-      this.autoCommitAcks = false;
-      this.autoCommitSends = false;
-
-      if (this.tx != null) {
-         //that's not expected, maybe a warning.
-         this.tx.rollback();
-         this.tx = null;
-      }
-
-      this.tx = newTransaction();
-   }
-
-   //amq specific behavior
-
-   // TODO: move this to AMQSession
-   public void amqRollback(Set<Long> acked) throws Exception {
-      if (tx == null) {
-         // Might be null if XA
-
-         tx = newTransaction();
-      }
-
-      RefsOperation oper = (RefsOperation) tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
-
-      if (oper != null) {
-         List<MessageReference> ackRefs = oper.getReferencesToAcknowledge();
-         Map<Long, List<MessageReference>> toAcks = new HashMap<>();
-         for (MessageReference ref : ackRefs) {
-            Long consumerId = ref.getConsumerId();
-
-            if (this.consumers.containsKey(consumerId)) {
-               if (acked.contains(ref.getMessage().getMessageID())) {
-                  List<MessageReference> ackList = toAcks.get(consumerId);
-                  if (ackList == null) {
-                     ackList = new ArrayList<>();
-                     toAcks.put(consumerId, ackList);
-                  }
-                  ackList.add(ref);
-               }
-            }
-            else {
-               //consumer must have been closed, cancel to queue
-               ref.getQueue().cancel(tx, ref);
-            }
-         }
-         //iterate consumers
-         if (toAcks.size() > 0) {
-            Iterator<Entry<Long, List<MessageReference>>> iter = toAcks.entrySet().iterator();
-            while (iter.hasNext()) {
-               Entry<Long, List<MessageReference>> entry = iter.next();
-               ServerConsumer consumer = consumers.get(entry.getKey());
-               ((AMQServerConsumer) consumer).amqPutBackToDeliveringList(entry.getValue());
-            }
-         }
-      }
-
-      tx.rollback();
-
-      if (xa) {
-         tx = null;
-      }
-      else {
-         tx = newTransaction();
-      }
-
-   }
-
-   /**
-    * The failed flag is used here to control delivery count.
-    * If set to true the delivery count won't decrement.
-    */
-   public void amqCloseConsumer(long consumerID, boolean failed) throws Exception {
-      final ServerConsumer consumer = consumers.get(consumerID);
-
-      if (consumer != null) {
-         consumer.close(failed);
-      }
-      else {
-         ActiveMQServerLogger.LOGGER.cannotFindConsumer(consumerID);
-      }
-   }
-
-   @Override
-   public ServerConsumer createConsumer(final long consumerID,
-                                        final SimpleString queueName,
-                                        final SimpleString filterString,
-                                        final boolean browseOnly,
-                                        final boolean supportLargeMessage,
-                                        final Integer credits) throws Exception {
-      if (this.internal) {
-         // Clebert TODO: PQP!!!!!!!!!!!!!!!!!!!!
-
-         //internal sessions doesn't check security:: Why??? //// what's the reason for that? Where a link?
-
-         Binding binding = postOffice.getBinding(queueName);
-
-         if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE) {
-            throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
-         }
-
-         Filter filter = FilterImpl.createFilter(filterString);
-
-         ServerConsumer consumer = newConsumer(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits);
-         consumers.put(consumer.getID(), consumer);
-
-         if (!browseOnly) {
-            TypedProperties props = new TypedProperties();
-
-            props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
-
-            props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
-
-            props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
-
-            props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
-
-            Queue theQueue = (Queue) binding.getBindable();
-
-            props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
-
-            // HORNETQ-946
-            props.putSimpleStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(username));
-
-            props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, SimpleString.toSimpleString(this.remotingConnection.getRemoteAddress()));
-
-            props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(name));
-
-            if (filterString != null) {
-               props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
-            }
-
-            Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CREATED, props);
-
-            if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
-               ActiveMQServerLogger.LOGGER.debug("Session with user=" + username + ", connection=" + this.remotingConnection + " created a consumer on queue " + queueName + ", filter = " + filterString);
-            }
-
-            managementService.sendNotification(notification);
-         }
-
-         return consumer;
-      }
-      else {
-         return super.createConsumer(consumerID, queueName, filterString, browseOnly, supportLargeMessage, credits);
-      }
-   }
-
-   @Override
-   public Queue createQueue(final SimpleString address,
-                            final SimpleString name,
-                            final SimpleString filterString,
-                            final boolean temporary,
-                            final boolean durable) throws Exception {
-      if (!this.internal) {
-         return super.createQueue(address, name, filterString, temporary, durable);
-      }
-
-      Queue queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary);
-
-      if (temporary) {
-         // Temporary queue in core simply means the queue will be deleted if
-         // the remoting connection
-         // dies. It does not mean it will get deleted automatically when the
-         // session is closed.
-         // It is up to the user to delete the queue when finished with it
-
-         TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(server, name);
-
-         remotingConnection.addCloseListener(cleaner);
-         remotingConnection.addFailureListener(cleaner);
-
-         tempQueueCleannerUppers.put(name, cleaner);
-      }
-
-      if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
-         ActiveMQServerLogger.LOGGER.debug("Queue " + name + " created on address " + name +
-                                              " with filter=" + filterString + " temporary = " +
-                                              temporary + " durable=" + durable + " on session user=" + this.username + ", connection=" + this.remotingConnection);
-      }
-
-      return queue;
-   }
-
-
-   // Clebert TODO: Get rid of these mthods
-   @Override
-   protected void doSend(final ServerMessage msg, final boolean direct) throws Exception {
-      if (!this.internal) {
-         super.doSend(msg, direct);
-         return;
-      }
-
-      //bypass security check for internal sessions
-      if (tx == null || autoCommitSends) {
-      }
-      else {
-         routingContext.setTransaction(tx);
-      }
-
-      try {
-         postOffice.route(msg, getQueueCreator(), routingContext, direct);
-
-         Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
-
-         if (value == null) {
-            targetAddressInfos.put(msg.getAddress(), new Pair<>(msg.getUserID(), new AtomicLong(1)));
-         }
-         else {
-            value.setA(msg.getUserID());
-            value.getB().incrementAndGet();
-         }
-      }
-      finally {
-         routingContext.clear();
-      }
-   }
-
-   @Override
-   protected ServerConsumer newConsumer(long consumerID,
-                                        ServerSessionImpl serverSessionImpl,
-                                        QueueBinding binding,
-                                        Filter filter,
-                                        boolean started2,
-                                        boolean browseOnly,
-                                        StorageManager storageManager2,
-                                        SessionCallback callback2,
-                                        boolean preAcknowledge2,
-                                        boolean strictUpdateDeliveryCount2,
-                                        ManagementService managementService2,
-                                        boolean supportLargeMessage,
-                                        Integer credits) throws Exception {
-      return new AMQServerConsumer(consumerID, this, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, this.server);
-   }
-
-   public AMQServerConsumer getConsumer(long nativeId) {
-      return (AMQServerConsumer) this.consumers.get(nativeId);
-   }
-
-   public void setInternal(boolean internal) {
-      this.internal = internal;
-   }
-
-   public boolean isInternal() {
-      return this.internal;
-   }
-
-   public void moveToDeadLetterAddress(long consumerId, long mid, Throwable cause) throws Exception {
-      AMQServerConsumer consumer = getConsumer(consumerId);
-      consumer.moveToDeadLetterAddress(mid, cause);
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
deleted file mode 100644
index a6ca4a0..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.persistence.OperationContext;
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.postoffice.PostOffice;
-import org.apache.activemq.artemis.core.security.SecurityStore;
-import org.apache.activemq.artemis.core.server.QueueCreator;
-import org.apache.activemq.artemis.core.server.ServerSessionFactory;
-import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
-import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
-import org.apache.activemq.artemis.core.server.management.ManagementService;
-import org.apache.activemq.artemis.core.transaction.ResourceManager;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-
-public class AMQServerSessionFactory implements ServerSessionFactory {
-
-   private static final AMQServerSessionFactory singleInstance = new AMQServerSessionFactory();
-
-   public static AMQServerSessionFactory getInstance() {
-      return singleInstance;
-   }
-
-   private AMQServerSessionFactory() {
-   }
-
-   @Override
-   public ServerSessionImpl createCoreSession(String name,
-                                              String username,
-                                              String password,
-                                              int minLargeMessageSize,
-                                              boolean autoCommitSends,
-                                              boolean autoCommitAcks,
-                                              boolean preAcknowledge,
-                                              boolean persistDeliveryCountBeforeDelivery,
-                                              boolean xa,
-                                              RemotingConnection connection,
-                                              StorageManager storageManager,
-                                              PostOffice postOffice,
-                                              ResourceManager resourceManager,
-                                              SecurityStore securityStore,
-                                              ManagementService managementService,
-                                              ActiveMQServerImpl activeMQServerImpl,
-                                              SimpleString managementAddress,
-                                              SimpleString simpleString,
-                                              SessionCallback callback,
-                                              QueueCreator queueCreator,
-                                              OperationContext context) throws Exception {
-      return new AMQServerSession(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, persistDeliveryCountBeforeDelivery, xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, activeMQServerImpl, managementAddress, simpleString, callback, queueCreator, context);
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 4675dca..74dd951 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -17,13 +17,7 @@
 package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
 import javax.jms.ResourceAllocationException;
-import javax.transaction.xa.Xid;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -32,15 +26,15 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
+import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
-import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@@ -48,43 +42,30 @@ import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.TransactionInfo;
-import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.wireformat.WireFormat;
 
 public class AMQSession implements SessionCallback {
 
    // ConsumerID is generated inside the session, 0, 1, 2, ... as many consumers as you have on the session
-   protected final IDGenerator idGenerator = new SimpleIDGenerator(0);
+   protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
 
    private ConnectionInfo connInfo;
-   private AMQServerSession coreSession;
+   private ServerSession coreSession;
    private SessionInfo sessInfo;
    private ActiveMQServer server;
    private OpenWireConnection connection;
 
-   private Map<Long, AMQConsumer> consumers = new ConcurrentHashMap<>();
-
    private AtomicBoolean started = new AtomicBoolean(false);
 
-   private TransactionId txId = null;
-
-   private boolean isTx;
-
    private final ScheduledExecutorService scheduledPool;
 
-   private OpenWireProtocolManager manager;
-
    // The sessionWireformat used by the session
    // this object is meant to be used per thread / session
    // so we make a new one per AMQSession
@@ -94,20 +75,22 @@ public class AMQSession implements SessionCallback {
                      SessionInfo sessInfo,
                      ActiveMQServer server,
                      OpenWireConnection connection,
-                     ScheduledExecutorService scheduledPool,
-                     OpenWireProtocolManager manager) {
+                     ScheduledExecutorService scheduledPool) {
       this.connInfo = connInfo;
       this.sessInfo = sessInfo;
 
       this.server = server;
       this.connection = connection;
       this.scheduledPool = scheduledPool;
-      this.manager = manager;
       OpenWireFormat marshaller = (OpenWireFormat) connection.getMarshaller();
 
       this.converter = new OpenWireMessageConverter(marshaller.copy());
    }
 
+   public boolean isClosed() {
+      return coreSession.isClosed();
+   }
+
    public OpenWireMessageConverter getConverter() {
       return converter;
    }
@@ -122,7 +105,7 @@ public class AMQSession implements SessionCallback {
       // now
 
       try {
-         coreSession = (AMQServerSession) server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, AMQServerSessionFactory.getInstance(), true);
+         coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true);
 
          long sessionId = sessInfo.getSessionId().getValue();
          if (sessionId == -1) {
@@ -136,8 +119,8 @@ public class AMQSession implements SessionCallback {
    }
 
    public List<AMQConsumer> createConsumer(ConsumerInfo info,
-                              AMQSession amqSession,
-                              SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
+                                           AMQSession amqSession,
+                                           SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
       //check destination
       ActiveMQDestination dest = info.getDestination();
       ActiveMQDestination[] dests = null;
@@ -147,7 +130,7 @@ public class AMQSession implements SessionCallback {
       else {
          dests = new ActiveMQDestination[]{dest};
       }
-//      Map<ActiveMQDestination, AMQConsumer> consumerMap = new HashMap<>();
+
       List<AMQConsumer> consumersList = new java.util.LinkedList<>();
 
       for (ActiveMQDestination openWireDest : dests) {
@@ -157,9 +140,9 @@ public class AMQSession implements SessionCallback {
          }
          AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool);
 
-         consumer.init(slowConsumerDetectionListener, idGenerator.generateID());
+         long nativeID = consumerIDGenerator.generateID();
+         consumer.init(slowConsumerDetectionListener, nativeID);
          consumersList.add(consumer);
-         consumers.put(consumer.getNativeId(), consumer);
       }
 
       return consumersList;
@@ -180,7 +163,7 @@ public class AMQSession implements SessionCallback {
 
    @Override
    public void browserFinished(ServerConsumer consumer) {
-      AMQConsumer theConsumer = ((AMQServerConsumer) consumer).getAmqConsumer();
+      AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
       if (theConsumer != null) {
          theConsumer.browseFinished();
       }
@@ -204,13 +187,20 @@ public class AMQSession implements SessionCallback {
    }
 
    @Override
-   public int sendMessage(ServerMessage message, ServerConsumer consumerID, int deliveryCount) {
-      AMQConsumer consumer = consumers.get(consumerID.getID());
-      return consumer.handleDeliver(message, deliveryCount);
+   public int sendMessage(MessageReference reference,
+                          ServerMessage message,
+                          ServerConsumer consumer,
+                          int deliveryCount) {
+      AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
+      return theConsumer.handleDeliver(reference, message, deliveryCount);
    }
 
    @Override
-   public int sendLargeMessage(ServerMessage message, ServerConsumer consumerID, long bodySize, int deliveryCount) {
+   public int sendLargeMessage(MessageReference reference,
+                               ServerMessage message,
+                               ServerConsumer consumerID,
+                               long bodySize,
+                               int deliveryCount) {
       // TODO Auto-generated method stub
       return 0;
    }
@@ -231,16 +221,15 @@ public class AMQSession implements SessionCallback {
    }
 
    @Override
-   public boolean hasCredits(ServerConsumer consumerID) {
+   public boolean hasCredits(ServerConsumer consumer) {
 
-      AMQConsumer amqConsumer;
+      AMQConsumer amqConsumer = null;
 
-      amqConsumer = consumers.get(consumerID.getID());
-
-      if (amqConsumer != null) {
-         return amqConsumer.hasCredits();
+      if (consumer.getProtocolData() != null) {
+         amqConsumer = (AMQConsumer) consumer.getProtocolData();
       }
-      return false;
+
+      return amqConsumer != null && amqConsumer.hasCredits();
    }
 
    @Override
@@ -252,11 +241,6 @@ public class AMQSession implements SessionCallback {
    public void send(final ProducerInfo producerInfo,
                     final Message messageSend,
                     boolean sendProducerAck) throws Exception {
-      TransactionId tid = messageSend.getTransactionId();
-      if (tid != null) {
-         resetSessionTx(tid);
-      }
-
       messageSend.setBrokerInTime(System.currentTimeMillis());
 
       ActiveMQDestination destination = messageSend.getDestination();
@@ -376,7 +360,7 @@ public class AMQSession implements SessionCallback {
       }
    }
 
-   public AMQServerSession getCoreSession() {
+   public ServerSession getCoreSession() {
       return this.coreSession;
    }
 
@@ -384,160 +368,16 @@ public class AMQSession implements SessionCallback {
       return this.server;
    }
 
-   public void removeConsumer(long consumerId) throws Exception {
-      boolean failed = !(this.txId != null || this.isTx);
-
-      coreSession.amqCloseConsumer(consumerId, failed);
-      consumers.remove(consumerId);
-   }
-
    public WireFormat getMarshaller() {
       return this.connection.getMarshaller();
    }
 
-   public void acknowledge(MessageAck ack, AMQConsumer consumer) throws Exception {
-      TransactionId tid = ack.getTransactionId();
-      if (tid != null) {
-         this.resetSessionTx(ack.getTransactionId());
-      }
-      consumer.acknowledge(ack);
-
-      if (tid == null && ack.getAckType() == MessageAck.STANDARD_ACK_TYPE) {
-         this.coreSession.commit();
-      }
-   }
-
-   //AMQ session and transactions are create separately. Whether a session
-   //is transactional or not is known only when a TransactionInfo command
-   //comes in.
-   public void resetSessionTx(TransactionId xid) throws Exception {
-      if ((this.txId != null) && (!this.txId.equals(xid))) {
-         throw new IllegalStateException("Session already associated with a tx");
-      }
-
-      this.isTx = true;
-      if (this.txId == null) {
-         //now reset session
-         this.txId = xid;
-
-         if (xid.isXATransaction()) {
-            XATransactionId xaXid = (XATransactionId) xid;
-            coreSession.enableXA();
-            XidImpl coreXid = new XidImpl(xaXid.getBranchQualifier(), xaXid.getFormatId(), xaXid.getGlobalTransactionId());
-            coreSession.xaStart(coreXid);
-         }
-         else {
-            coreSession.enableTx();
-         }
-
-         this.manager.registerTx(this.txId, this);
-      }
-   }
-
-   private void checkTx(TransactionId inId) {
-      if (this.txId == null) {
-         throw new IllegalStateException("Session has no transaction associated with it");
-      }
-
-      if (!this.txId.equals(inId)) {
-         throw new IllegalStateException("Session already associated with another tx");
-      }
-
-      this.isTx = true;
-   }
-
-   public void endTransaction(TransactionInfo info) throws Exception {
-      checkTx(info.getTransactionId());
-
-      if (txId.isXATransaction()) {
-         XATransactionId xid = (XATransactionId) txId;
-         XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId());
-         this.coreSession.xaEnd(coreXid);
-      }
-   }
-
-   public void commitOnePhase(TransactionInfo info) throws Exception {
-      checkTx(info.getTransactionId());
-
-      if (txId.isXATransaction()) {
-         XATransactionId xid = (XATransactionId) txId;
-         XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId());
-         this.coreSession.xaCommit(coreXid, true);
-      }
-      else {
-         Iterator<AMQConsumer> iter = consumers.values().iterator();
-         while (iter.hasNext()) {
-            AMQConsumer consumer = iter.next();
-            consumer.finishTx();
-         }
-         this.coreSession.commit();
-      }
-
-      this.txId = null;
-   }
-
-   public void prepareTransaction(XATransactionId xid) throws Exception {
-      checkTx(xid);
-      XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId());
-      this.coreSession.xaPrepare(coreXid);
-   }
-
-   public void commitTwoPhase(XATransactionId xid) throws Exception {
-      checkTx(xid);
-      XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId());
-      this.coreSession.xaCommit(coreXid, false);
-
-      this.txId = null;
-   }
-
-   public void rollback(TransactionInfo info) throws Exception {
-      checkTx(info.getTransactionId());
-      if (this.txId.isXATransaction()) {
-         XATransactionId xid = (XATransactionId) txId;
-         XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId());
-         this.coreSession.xaRollback(coreXid);
-      }
-      else {
-         Iterator<AMQConsumer> iter = consumers.values().iterator();
-         Set<Long> acked = new HashSet<>();
-         while (iter.hasNext()) {
-            AMQConsumer consumer = iter.next();
-            consumer.rollbackTx(acked);
-         }
-         //on local rollback, amq broker doesn't do anything about the delivered
-         //messages, which stay at clients until next time
-         this.coreSession.amqRollback(acked);
-      }
-
-      this.txId = null;
-   }
-
-   public void recover(List<TransactionId> recovered) {
-      List<Xid> xids = this.coreSession.xaGetInDoubtXids();
-      for (Xid xid : xids) {
-         XATransactionId amqXid = new XATransactionId(xid);
-         recovered.add(amqXid);
-      }
-   }
-
-   public void forget(final TransactionId tid) throws Exception {
-      checkTx(tid);
-      XATransactionId xid = (XATransactionId) tid;
-      XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId());
-      this.coreSession.xaForget(coreXid);
-      this.txId = null;
-   }
-
    public ConnectionInfo getConnectionInfo() {
       return this.connInfo;
    }
 
-   public void setInternal(boolean internal) {
-      this.coreSession.setInternal(internal);
-   }
-
-   public boolean isInternal() {
-      return this.coreSession.isInternal();
+   public void disableSecurity() {
+      this.coreSession.disableSecurity();
    }
 
    public void deliverMessage(MessageDispatch dispatch) {
@@ -548,20 +388,6 @@ public class AMQSession implements SessionCallback {
       this.coreSession.close(false);
    }
 
-   public AMQConsumer getConsumer(Long coreConsumerId) {
-      return consumers.get(coreConsumerId);
-   }
-
-   public void updateConsumerPrefetchSize(ConsumerId consumerId, int prefetch) {
-      Iterator<AMQConsumer> iterator = consumers.values().iterator();
-      while (iterator.hasNext()) {
-         AMQConsumer consumer = iterator.next();
-         if (consumer.getId().equals(consumerId)) {
-            consumer.setPrefetchSize(prefetch);
-         }
-      }
-   }
-
    public OpenWireConnection getConnection() {
       return connection;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java
index b29c448..e02638e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java
@@ -40,6 +40,12 @@ public class AMQSingleConsumerBrokerExchange extends AMQConsumerBrokerExchange {
 
    @Override
    public void acknowledge(MessageAck ack) throws Exception {
-      amqSession.acknowledge(ack, consumer);
+      consumer.acknowledge(ack);
    }
+
+   @Override
+   public void updateConsumerPrefetchSize(int prefetch) {
+      consumer.setPrefetchSize(prefetch);
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransactionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransactionFactory.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransactionFactory.java
deleted file mode 100644
index 3a47333..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransactionFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import javax.transaction.xa.Xid;
-
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.protocol.openwire.AMQTransactionImpl;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.core.transaction.TransactionFactory;
-
-public class AMQTransactionFactory implements TransactionFactory {
-
-   @Override
-   public Transaction newTransaction(Xid xid, StorageManager storageManager, int timeoutSeconds) {
-      return new AMQTransactionImpl(xid, storageManager, timeoutSeconds);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/MessageInfo.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/MessageInfo.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/MessageInfo.java
deleted file mode 100644
index 005dd2e..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/MessageInfo.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import org.apache.activemq.command.MessageId;
-
-public class MessageInfo {
-
-   public MessageId amqId;
-   public long nativeId;
-   public int size;
-   //mark message that is acked within a local tx
-   public boolean localAcked;
-
-   public MessageInfo(MessageId amqId, long nativeId, int size) {
-      this.amqId = amqId;
-      this.nativeId = nativeId;
-      this.size = size;
-   }
-
-   @Override
-   public String toString() {
-      return "native mid: " + this.nativeId + " amqId: " + amqId + " local acked: " + localAcked;
-   }
-
-   public void setLocalAcked(boolean ack) {
-      localAcked = ack;
-   }
-
-   public boolean isLocalAcked() {
-      return localAcked;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 0cad259..b3e52af 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -226,7 +226,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor> {
       if (stompSession == null) {
          stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
          String name = UUIDGenerator.getInstance().generateStringUUID();
-         ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, null, true);
+         ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true);
          stompSession.setServerSession(session);
          sessions.put(connection.getID(), stompSession);
       }
@@ -239,7 +239,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor> {
       if (stompSession == null) {
          stompSession = new StompSession(connection, this, server.getStorageManager().newContext(executor));
          String name = UUIDGenerator.getInstance().generateStringUUID();
-         ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, null, true);
+         ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true);
          stompSession.setServerSession(session);
          transactedSessions.put(txID, stompSession);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index a6cbe71..9b5c70d 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
@@ -118,7 +119,7 @@ public class StompSession implements SessionCallback {
    }
 
    @Override
-   public int sendMessage(ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) {
+   public int sendMessage(MessageReference ref, ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) {
       LargeServerMessageImpl largeMessage = null;
       ServerMessage newServerMessage = serverMessage;
       try {
@@ -207,7 +208,7 @@ public class StompSession implements SessionCallback {
    }
 
    @Override
-   public int sendLargeMessage(ServerMessage msg, ServerConsumer consumer, long bodySize, int deliveryCount) {
+   public int sendLargeMessage(MessageReference ref, ServerMessage msg, ServerConsumer consumer, long bodySize, int deliveryCount) {
       return 0;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 1a9690f..de0a2fd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -1301,7 +1301,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
    public TransportConfiguration[] getTransportConfigurations(final List<String> connectorNames) {
       TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, connectorNames.size());
       int count = 0;
-      System.out.println(debugConnectors());
 
       for (String connectorName : connectorNames) {
          TransportConfiguration connector = getConnectorConfigurations().get(connectorName);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 82b0e92..99e9160 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.transaction.Transaction;
 
 public class PagedReferenceImpl implements PagedReference {
 
@@ -48,6 +49,18 @@ public class PagedReferenceImpl implements PagedReference {
 
    private boolean alreadyAcked;
 
+   private Object protocolData;
+
+   @Override
+   public Object getProtocolData() {
+      return protocolData;
+   }
+
+   @Override
+   public void setProtocolData(Object protocolData) {
+      this.protocolData = protocolData;
+   }
+
    @Override
    public ServerMessage getMessage() {
       return getPagedMessage().getMessage();
@@ -199,9 +212,19 @@ public class PagedReferenceImpl implements PagedReference {
       subscription.ack(this);
    }
 
+   @Override
+   public void acknowledge(Transaction tx) throws Exception {
+      if (tx == null) {
+         getQueue().acknowledge(this);
+      }
+      else {
+         getQueue().acknowledge(tx, this);
+      }
+   }
+
    /* (non-Javadoc)
-    * @see java.lang.Object#toString()
-    */
+       * @see java.lang.Object#toString()
+       */
    @Override
    public String toString() {
       String msgToString;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index 2de5adb..0e5cd2f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -149,7 +149,7 @@ public class ActiveMQPacketHandler implements ChannelHandler {
          }
 
          ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(),
-                                                      new CoreSessionCallback(request.getName(), protocolManager, channel, connection), null, true);
+                                                      new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true);
 
          ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel);
          channel.setHandler(handler);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index c05a288..9d6125b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -56,7 +57,7 @@ public final class CoreSessionCallback implements SessionCallback {
    }
 
    @Override
-   public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
+   public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
       Packet packet = new SessionReceiveLargeMessage(consumer.getID(), message, bodySize, deliveryCount);
 
       channel.send(packet);
@@ -79,7 +80,7 @@ public final class CoreSessionCallback implements SessionCallback {
    }
 
    @Override
-   public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+   public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
       Packet packet = new SessionReceiveMessage(consumer.getID(), message, deliveryCount);
 
       int size = 0;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 64633bb..b47df20 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -139,7 +139,6 @@ public interface ActiveMQServer extends ActiveMQComponent {
                                boolean xa,
                                String defaultAddress,
                                SessionCallback callback,
-                               ServerSessionFactory sessionFactory,
                                boolean autoCreateQueues) throws Exception;
 
    SecurityStore getSecurityStore();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 0ff55ac..b1e0dde 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.server;
 
+import org.apache.activemq.artemis.core.transaction.Transaction;
+
 /**
  * A reference to a message.
  *
@@ -35,6 +37,14 @@ public interface MessageReference {
     */
    int getMessageMemoryEstimate();
 
+   /** To be used on holding protocol specific data during the delivery.
+    *  This will be only valid while the message is on the delivering queue at the consumer  */
+   Object getProtocolData();
+
+   /** To be used on holding protocol specific data during the delivery.
+    *  This will be only valid while the message is on the delivering queue at the consumer  */
+   void setProtocolData(Object data);
+
    MessageReference copy(Queue queue);
 
    /**
@@ -61,6 +71,8 @@ public interface MessageReference {
 
    void acknowledge() throws Exception;
 
+   void acknowledge(Transaction tx) throws Exception;
+
    void setConsumerId(Long consumerID);
 
    Long getConsumerId();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index c92325a..ec9d4a3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -150,6 +150,8 @@ public interface Queue extends Bindable {
 
    int sendMessagesToDeadLetterAddress(Filter filter) throws Exception;
 
+   void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception;
+
    boolean changeReferencePriority(long messageID, byte newPriority) throws Exception;
 
    int changeReferencesPriority(Filter filter, byte newPriority) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index d75efdd..d157a8c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -31,6 +31,12 @@ public interface ServerConsumer extends Consumer {
 
    void fireSlowConsumer();
 
+   /** this is to be used with anything specific on a protocol head. */
+   Object getProtocolData();
+
+   /** this is to be used with anything specific on a protocol head. */
+   void setProtocolData(Object protocolData);
+
    /**
     * @param protocolContext
     * @see #getProtocolContext()
@@ -68,6 +74,12 @@ public interface ServerConsumer extends Consumer {
 
    MessageReference removeReferenceByID(long messageID) throws Exception;
 
+   /** Some protocols may choose to send the message back to delivering instead of redeliver.
+    *  For example openwire will redeliver through the client, so messages will go back to delivering list after rollback. */
+   void backToDelivering(MessageReference reference);
+
+   List<MessageReference> getDeliveringReferencesBasedOnProtocol(boolean remove, Object protocolDataStart, Object protocolDataEnd);
+
    void acknowledge(Transaction tx, long messageID) throws Exception;
 
    void individualAcknowledge(Transaction tx, long messageID) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 62bb3b5..b7a7c47 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -36,9 +36,19 @@ public interface ServerSession extends SecurityAuth {
 
    Object getConnectionID();
 
+   /**
+    * Certain protocols may create an internal session that shouldn't go through security checks.
+    * make sure you don't expose this property through any protocol layer as that would be a security breach
+    */
+   void enableSecurity();
+
+   void disableSecurity();
+
    @Override
    RemotingConnection getRemotingConnection();
 
+   Transaction newTransaction();
+
    boolean removeConsumer(long consumerID) throws Exception;
 
    void acknowledge(long consumerID, long messageID) throws Exception;
@@ -87,6 +97,11 @@ public interface ServerSession extends SecurityAuth {
 
    void stop();
 
+   /**
+    * To be used by protocol heads that needs to control the transaction outside the session context.
+    */
+   void resetTX(Transaction transaction);
+
    Queue createQueue(SimpleString address,
                      SimpleString name,
                      SimpleString filterString,
@@ -100,6 +115,13 @@ public interface ServerSession extends SecurityAuth {
                                  SimpleString filterString,
                                  boolean browseOnly) throws Exception;
 
+   ServerConsumer createConsumer(final long consumerID,
+                                 final SimpleString queueName,
+                                 final SimpleString filterString,
+                                 final boolean browseOnly,
+                                 final boolean supportLargeMessage,
+                                 final Integer credits) throws Exception;
+
    QueueQueryResult executeQueueQuery(SimpleString name) throws Exception;
 
    BindingQueryResult executeBindingQuery(SimpleString address) throws Exception;
@@ -151,6 +173,10 @@ public interface ServerSession extends SecurityAuth {
 
    Transaction getCurrentTransaction();
 
+   ServerConsumer locateConsumer(long consumerID) throws Exception;
+
+   boolean isClosed();
+
    void createSharedQueue(SimpleString address,
                           SimpleString name,
                           boolean durable,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSessionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSessionFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSessionFactory.java
deleted file mode 100644
index 6447daa..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSessionFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.server;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.persistence.OperationContext;
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.postoffice.PostOffice;
-import org.apache.activemq.artemis.core.security.SecurityStore;
-import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
-import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
-import org.apache.activemq.artemis.core.server.management.ManagementService;
-import org.apache.activemq.artemis.core.transaction.ResourceManager;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-
-public interface ServerSessionFactory {
-
-   ServerSessionImpl createCoreSession(String name,
-                                       String username,
-                                       String password,
-                                       int minLargeMessageSize,
-                                       boolean autoCommitSends,
-                                       boolean autoCommitAcks,
-                                       boolean preAcknowledge,
-                                       boolean persistDeliveryCountBeforeDelivery,
-                                       boolean xa,
-                                       RemotingConnection connection,
-                                       StorageManager storageManager,
-                                       PostOffice postOffice,
-                                       ResourceManager resourceManager,
-                                       SecurityStore securityStore,
-                                       ManagementService managementService,
-                                       ActiveMQServerImpl activeMQServerImpl,
-                                       SimpleString managementAddress,
-                                       SimpleString simpleString,
-                                       SessionCallback callback,
-                                       QueueCreator queueCreator,
-                                       OperationContext context) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 13a1283..69d13bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -112,7 +112,6 @@ import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
 import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.ServerSessionFactory;
 import org.apache.activemq.artemis.core.server.ServiceRegistry;
 import org.apache.activemq.artemis.core.server.cluster.BackupManager;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
@@ -1091,7 +1090,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                                       final boolean xa,
                                       final String defaultAddress,
                                       final SessionCallback callback,
-                                      final ServerSessionFactory sessionFactory,
                                       final boolean autoCreateQueues) throws Exception {
 
       if (securityStore != null) {
@@ -1105,7 +1103,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       checkSessionLimit(username);
 
       final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor());
-      final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, sessionFactory, autoCreateQueues);
+      final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues);
 
       sessions.put(name, session);
 
@@ -1178,14 +1176,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                                                      String defaultAddress,
                                                      SessionCallback callback,
                                                      OperationContext context,
-                                                     ServerSessionFactory sessionFactory,
                                                      boolean autoCreateJMSQueues) throws Exception {
-      if (sessionFactory == null) {
-         return new ServerSessionImpl(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, autoCreateJMSQueues ? jmsQueueCreator : null);
-      }
-      else {
-         return sessionFactory.createCoreSession(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, jmsQueueCreator, context);
-      }
+      return new ServerSessionImpl(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, autoCreateJMSQueues ? jmsQueueCreator : null);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 9feb60e..932c260 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.Transaction;
 
 /**
  * A queue that will discard messages if a newer message with the same
@@ -188,6 +189,16 @@ public class LastValueQueue extends QueueImpl {
       }
 
       @Override
+      public Object getProtocolData() {
+         return ref.getProtocolData();
+      }
+
+      @Override
+      public void setProtocolData(Object data) {
+         ref.setProtocolData(data);
+      }
+
+      @Override
       public void setAlreadyAcked() {
          ref.setAlreadyAcked();
       }
@@ -247,6 +258,11 @@ public class LastValueQueue extends QueueImpl {
       }
 
       @Override
+      public void acknowledge(Transaction tx) throws Exception {
+         ref.acknowledge(tx);
+      }
+
+      @Override
       public void setPersistedCount(int count) {
          ref.setPersistedCount(count);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index fd04b6d..de4d5ae 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.MemorySize;
 
 /**
@@ -42,6 +43,8 @@ public class MessageReferenceImpl implements MessageReference {
 
    private boolean alreadyAcked;
 
+   private Object protocolData;
+
    // Static --------------------------------------------------------
 
    private static final int memoryOffset;
@@ -86,6 +89,16 @@ public class MessageReferenceImpl implements MessageReference {
 
    // MessageReference implementation -------------------------------
 
+   @Override
+   public Object getProtocolData() {
+      return protocolData;
+   }
+
+   @Override
+   public void setProtocolData(Object protocolData) {
+      this.protocolData = protocolData;
+   }
+
    /**
     * @return the persistedCount
     */
@@ -174,7 +187,16 @@ public class MessageReferenceImpl implements MessageReference {
 
    @Override
    public void acknowledge() throws Exception {
-      queue.acknowledge(this);
+      this.acknowledge(null);
+   }
+
+   public void acknowledge(Transaction tx) throws Exception {
+      if (tx == null) {
+         getQueue().acknowledge(this);
+      }
+      else {
+         getQueue().acknowledge(tx, this);
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 86ca36c..75f0f98 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1074,7 +1074,7 @@ public class QueueImpl implements Queue {
          if (isTrace) {
             ActiveMQServerLogger.LOGGER.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName());
          }
-         move(expiryAddress, ref, true, false);
+         move(null, expiryAddress, ref, true, false);
       }
       else {
          if (isTrace) {
@@ -1461,7 +1461,7 @@ public class QueueImpl implements Queue {
             MessageReference ref = iter.next();
             if (ref.getMessage().getMessageID() == messageID) {
                incDelivering();
-               sendToDeadLetterAddress(ref);
+               sendToDeadLetterAddress(null, ref);
                iter.remove();
                refRemoved(ref);
                return true;
@@ -1480,7 +1480,7 @@ public class QueueImpl implements Queue {
             MessageReference ref = iter.next();
             if (filter == null || filter.match(ref.getMessage())) {
                incDelivering();
-               sendToDeadLetterAddress(ref);
+               sendToDeadLetterAddress(null, ref);
                iter.remove();
                refRemoved(ref);
                count++;
@@ -1507,7 +1507,7 @@ public class QueueImpl implements Queue {
                refRemoved(ref);
                incDelivering();
                try {
-                  move(toAddress, ref, false, rejectDuplicate);
+                  move(null, toAddress, ref, false, rejectDuplicate);
                }
                catch (Exception e) {
                   decDelivering();
@@ -2120,7 +2120,7 @@ public class QueueImpl implements Queue {
          if (isTrace) {
             ActiveMQServerLogger.LOGGER.trace("Sending reference " + reference + " to DLA = " + addressSettings.getDeadLetterAddress() + " since ref.getDeliveryCount=" + reference.getDeliveryCount() + "and maxDeliveries=" + maxDeliveries + " from queue=" + this.getName());
          }
-         sendToDeadLetterAddress(reference, addressSettings.getDeadLetterAddress());
+         sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress());
 
          return false;
       }
@@ -2337,36 +2337,45 @@ public class QueueImpl implements Queue {
       }
    }
 
-   public void sendToDeadLetterAddress(final MessageReference ref) throws Exception {
-      sendToDeadLetterAddress(ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());
+   public void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception {
+      sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());
    }
 
-   private void sendToDeadLetterAddress(final MessageReference ref,
+   private void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref,
                                         final SimpleString deadLetterAddress) throws Exception {
       if (deadLetterAddress != null) {
          Bindings bindingList = postOffice.getBindingsForAddress(deadLetterAddress);
 
          if (bindingList.getBindings().isEmpty()) {
             ActiveMQServerLogger.LOGGER.messageExceededMaxDelivery(ref, deadLetterAddress);
-            acknowledge(ref);
+            ref.acknowledge(tx);
          }
          else {
             ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
-            move(deadLetterAddress, ref, false, false);
+            move(tx, deadLetterAddress, ref, false, false);
          }
       }
       else {
          ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(name);
 
-         acknowledge(ref);
+         ref.acknowledge(tx);
       }
    }
 
-   private void move(final SimpleString address,
+   private void move(final Transaction originalTX,
+                     final SimpleString address,
                      final MessageReference ref,
                      final boolean expiry,
                      final boolean rejectDuplicate) throws Exception {
-      Transaction tx = new TransactionImpl(storageManager);
+      Transaction tx;
+
+      if (originalTX != null) {
+         tx = originalTX;
+      }
+      else {
+         // if no TX we create a new one to commit at the end
+         tx = new TransactionImpl(storageManager);
+      }
 
       ServerMessage copyMessage = makeCopy(ref, expiry);
 
@@ -2376,7 +2385,9 @@ public class QueueImpl implements Queue {
 
       acknowledge(tx, ref);
 
-      tx.commit();
+      if (originalTX == null) {
+         tx.commit();
+      }
    }
 
    /*


Mime
View raw message