Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1692718D2C for ; Fri, 1 Apr 2016 02:20:18 +0000 (UTC) Received: (qmail 56087 invoked by uid 500); 1 Apr 2016 02:20:18 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 55959 invoked by uid 500); 1 Apr 2016 02:20:17 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 55852 invoked by uid 99); 1 Apr 2016 02:20:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Apr 2016 02:20:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 85CC0E03CE; Fri, 1 Apr 2016 02:20:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Fri, 01 Apr 2016 02:20:20 -0000 Message-Id: In-Reply-To: <3e55bfe96b804ed3b315f0e1a0a90fc3@git.apache.org> References: <3e55bfe96b804ed3b315f0e1a0a90fc3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/4] activemq-artemis git commit: major refactoring on Transactions and AMQ objects major refactoring on Transactions and AMQ objects Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fb445681 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fb445681 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fb445681 Branch: refs/heads/refactor-openwire Commit: fb4456813672e7e00e29502aa243205ef6cc191c Parents: 41a55a1 Author: Clebert Suconic Authored: Thu Mar 31 14:48:56 2016 -0400 Committer: Clebert Suconic Committed: Thu Mar 31 22:19:47 2016 -0400 ---------------------------------------------------------------------- .../plug/ProtonSessionIntegrationCallback.java | 7 +- .../protocol/mqtt/MQTTConnectionManager.java | 6 +- .../core/protocol/mqtt/MQTTSessionCallback.java | 7 +- .../protocol/openwire/AMQTransactionImpl.java | 59 -- .../protocol/openwire/OpenWireConnection.java | 385 ++++++++++--- .../openwire/OpenWireMessageConverter.java | 1 + .../openwire/OpenWireProtocolManager.java | 117 +--- .../core/protocol/openwire/OpenWireUtil.java | 73 --- .../amq/AMQCompositeConsumerBrokerExchange.java | 9 +- .../core/protocol/openwire/amq/AMQConsumer.java | 247 +++----- .../openwire/amq/AMQConsumerBrokerExchange.java | 2 + .../openwire/amq/AMQServerConsumer.java | 102 ---- .../protocol/openwire/amq/AMQServerSession.java | 391 ------------- .../openwire/amq/AMQServerSessionFactory.java | 69 --- .../core/protocol/openwire/amq/AMQSession.java | 237 ++------ .../amq/AMQSingleConsumerBrokerExchange.java | 8 +- .../openwire/amq/AMQTransactionFactory.java | 32 -- .../core/protocol/openwire/amq/MessageInfo.java | 47 -- .../protocol/openwire/util/OpenWireUtil.java | 83 +++ .../protocol/stomp/StompProtocolManager.java | 4 +- .../core/protocol/stomp/StompSession.java | 5 +- .../core/paging/cursor/PagedReferenceImpl.java | 27 +- .../core/impl/ActiveMQPacketHandler.java | 2 +- .../protocol/core/impl/CoreSessionCallback.java | 5 +- .../artemis/core/server/ActiveMQServer.java | 1 - .../artemis/core/server/MessageReference.java | 12 + .../activemq/artemis/core/server/Queue.java | 2 + .../artemis/core/server/ServerConsumer.java | 12 + .../artemis/core/server/ServerSession.java | 26 + .../core/server/impl/ActiveMQServerImpl.java | 11 +- .../core/server/impl/LastValueQueue.java | 16 + .../core/server/impl/MessageReferenceImpl.java | 24 +- .../artemis/core/server/impl/QueueImpl.java | 39 +- .../core/server/impl/ServerConsumerImpl.java | 83 ++- .../core/server/impl/ServerSessionImpl.java | 138 ++--- .../artemis/core/transaction/Transaction.java | 7 + .../core/transaction/TransactionFactory.java | 26 - .../core/transaction/impl/TransactionImpl.java | 12 + .../spi/core/protocol/SessionCallback.java | 11 +- .../impl/ScheduledDeliveryHandlerTest.java | 4 + .../failover/FailoverTransactionTest.java | 10 +- .../integration/client/HangConsumerTest.java | 10 +- .../integration/openwire/BasicSecurityTest.java | 9 +- .../integration/openwire/OpenWireUtilTest.java | 2 +- .../openwire/SimpleOpenWireTest.java | 572 ++++++++++++++++++- .../InvestigationOpenwireTest.java | 246 -------- .../core/postoffice/impl/BindingsImplTest.java | 10 + .../unit/core/postoffice/impl/FakeQueue.java | 5 + 48 files changed, 1434 insertions(+), 1779 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 5d6af2a..f101dc7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -20,6 +20,7 @@ import java.util.concurrent.Executor; import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.qpid.proton.amqp.Binary; @@ -117,7 +118,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se false, // boolean autoCommitAcks, false, // boolean preAcknowledge, true, //boolean xa, - (String) null, this, null, true); + (String) null, this, true); } @Override @@ -341,7 +342,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se } @Override - public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) { + public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) { ProtonPlugSender plugSender = (ProtonPlugSender) consumer.getProtocolContext(); @@ -359,7 +360,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se } @Override - public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { + public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { return 0; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index 9d60513..a3b8b78 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -96,7 +96,11 @@ public class MQTTConnectionManager { String id = UUIDGenerator.getInstance().generateStringUUID(); ActiveMQServer server = session.getServer(); - ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null, session.getSessionCallback(), null, // Session factory + ServerSession serverSession = server.createSession(id, username, password, + ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, + session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, + MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, + MQTTUtil.SESSION_XA, null, session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE); return (ServerSessionImpl) serverSession; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java index 28d86b8..82b1ed6 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; @@ -41,7 +42,7 @@ public class MQTTSessionCallback implements SessionCallback { } @Override - public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) { + public int sendMessage(MessageReference referece, ServerMessage message, ServerConsumer consumer, int deliveryCount) { try { session.getMqttPublishManager().sendMessage(message, consumer, deliveryCount); } @@ -62,8 +63,8 @@ public class MQTTSessionCallback implements SessionCallback { } @Override - public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { - return sendMessage(message, consumer, deliveryCount); + public int sendLargeMessage(MessageReference reference, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { + return sendMessage(reference, message, consumer, deliveryCount); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java deleted file mode 100644 index bbd7e95..0000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire; - -import org.apache.activemq.artemis.core.persistence.StorageManager; -import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.impl.RefsOperation; -import org.apache.activemq.artemis.core.transaction.Transaction; -import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; - -import javax.transaction.xa.Xid; - -public class AMQTransactionImpl extends TransactionImpl { - - private boolean rollbackForClose = false; - - public AMQTransactionImpl(Xid xid, StorageManager storageManager, int timeoutSeconds) { - super(xid, storageManager, timeoutSeconds); - } - - @Override - public RefsOperation createRefsOperation(Queue queue) { - return new AMQrefsOperation(queue, storageManager); - } - - public class AMQrefsOperation extends RefsOperation { - - public AMQrefsOperation(Queue queue, StorageManager storageManager) { - super(queue, storageManager); - } - - - // This is because the Rollbacks happen through the consumer, not through the server's - @Override - public void afterRollback(Transaction tx) { - if (rollbackForClose) { - super.afterRollback(tx); - } - } - } - - public void setRollbackForClose() { - this.rollbackForClose = true; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index e8259c3..f1eb8c6 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -16,15 +16,18 @@ */ package org.apache.activemq.artemis.core.protocol.openwire; +import javax.jms.IllegalStateException; import javax.jms.InvalidClientIDException; import javax.jms.InvalidDestinationException; import javax.jms.JMSSecurityException; +import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -45,22 +48,30 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionConte import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerConsumer; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange; +import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.BindingQueryResult; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; +import org.apache.activemq.artemis.core.server.impl.RefsOperation; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; +import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.utils.ConcurrentHashSet; +import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; @@ -131,28 +142,39 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private Map sessionIdMap = new ConcurrentHashMap<>(); - private final Map consumerExchanges = new HashMap<>(); - private final Map producerExchanges = new HashMap<>(); + private final Map consumerExchanges = new ConcurrentHashMap<>(); + private final Map producerExchanges = new ConcurrentHashMap<>(); // Clebert TODO: Artemis already stores the Session. Why do we need a different one here private Map sessions = new ConcurrentHashMap<>(); - - private ConnectionState state; private final Set tempQueues = new ConcurrentHashSet<>(); - private Map txMap = new ConcurrentHashMap<>(); + + /** Openwire doesn't sen transactions associated with any sessions. + * It will however send beingTX / endTX as it would be doing it with XA Transactions. + * But always without any association with Sessions. + * This collection will hold nonXA transactions. Hopefully while they are in transit only. */ + private Map txMap = new ConcurrentHashMap<>(); private volatile AMQSession advisorySession; + private final ActiveMQServer server; + + /** This is to be used with connection operations that don't have a session. + * Such as TM operations. */ + private ServerSession internalSession; + // TODO-NOW: check on why there are two connections created for every createConnection on the client. public OpenWireConnection(Connection connection, + ActiveMQServer server, Executor executor, OpenWireProtocolManager openWireProtocolManager, OpenWireFormat wf) { super(connection, executor); + this.server = server; this.protocolManager = openWireProtocolManager; this.wireFormat = wf; } @@ -227,8 +249,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se response = command.visit(commandProcessorInstance); } catch (Exception e) { + // TODO: logging + e.printStackTrace(); if (responseRequired) { - response = new ExceptionResponse(e); + response = convertException(e); } } finally { @@ -276,6 +300,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } public void sendException(Exception e) { + Response resp = convertException(e); + try { + dispatch(resp); + } + catch (IOException e2) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e2); + } + } + + private Response convertException(Exception e) { Response resp; if (e instanceof ActiveMQSecurityException) { resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); @@ -286,12 +320,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se else { resp = new ExceptionResponse(e); } - try { - dispatch(resp); - } - catch (IOException e2) { - ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e2); - } + return resp; } private void setLastCommand(Command command) { @@ -471,12 +500,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se return result; } - private void removeConsumerBrokerExchange(ConsumerId id) { - synchronized (consumerExchanges) { - consumerExchanges.remove(id); - } - } - public void deliverMessage(MessageDispatch dispatch) { Message m = dispatch.getMessage(); if (m != null) { @@ -576,7 +599,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } - public AMQConnectionContext initContext(ConnectionInfo info) { + public AMQConnectionContext initContext(ConnectionInfo info) throws Exception { WireFormatInfo wireFormatInfo = wireFormat.getPreferedWireFormatInfo(); // Older clients should have been defaulting this field to true.. but // they were not. @@ -608,9 +631,15 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se info.setClientIp(getRemoteAddress()); } + createInternalSession(info); + return context; } + private void createInternalSession(ConnectionInfo info) throws Exception { + internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true); + } + //raise the refCount of context public void reconnect(AMQConnectionContext existingContext, ConnectionInfo info) { this.context = existingContext; @@ -663,17 +692,17 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se ActiveMQDestination dest = info.getDestination(); if (dest.isQueue()) { SimpleString qName = OpenWireUtil.toCoreAddress(dest); - QueueBinding binding = (QueueBinding) protocolManager.getServer().getPostOffice().getBinding(qName); + QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName); if (binding == null) { if (getState().getInfo() != null) { CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE; - protocolManager.getServer().getSecurityStore().check(qName, checkType, this); + server.getSecurityStore().check(qName, checkType, this); - protocolManager.getServer().checkQueueCreationLimit(getUsername()); + server.checkQueueCreationLimit(getUsername()); } ConnectionInfo connInfo = getState().getInfo(); - protocolManager.getServer().createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary()); + server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary()); } if (dest.isTemporary()) { @@ -692,9 +721,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public void updateConsumer(ConsumerControl consumerControl) { - SessionId sessionId = consumerControl.getConsumerId().getParentId(); - AMQSession amqSession = sessions.get(sessionId); - amqSession.updateConsumerPrefetchSize(consumerControl.getConsumerId(), consumerControl.getPrefetch()); + ConsumerId consumerId = consumerControl.getConsumerId(); + AMQConsumerBrokerExchange exchange = this.consumerExchanges.get(consumerId); + if (exchange != null) { + exchange.updateConsumerPrefetchSize(consumerControl.getPrefetch()); + } } public void addConsumer(ConsumerInfo info) throws Exception { @@ -707,7 +738,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } SessionState ss = cs.getSessionState(sessionId); if (ss == null) { - throw new IllegalStateException(protocolManager.getServer() + " Cannot add a consumer to a session that had not been registered: " + sessionId); + throw new IllegalStateException(server + " Cannot add a consumer to a session that had not been registered: " + sessionId); } // Avoid replaying dup commands if (!ss.getConsumerIds().contains(info.getConsumerId())) { @@ -729,13 +760,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public void onSlowConsumer(ServerConsumer consumer) { - if (consumer instanceof AMQServerConsumer) { - AMQServerConsumer serverConsumer = (AMQServerConsumer)consumer; - ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(serverConsumer.getAmqConsumer().getOpenwireDestination()); + if (consumer.getProtocolData() != null && consumer.getProtocolData() instanceof AMQConsumer) { + AMQConsumer amqConsumer = (AMQConsumer)consumer.getProtocolData(); + ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(amqConsumer.getOpenwireDestination()); ActiveMQMessage advisoryMessage = new ActiveMQMessage(); try { - advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, serverConsumer.getAmqConsumer().getId().toString()); - protocolManager.fireAdvisory(context, topic, advisoryMessage, serverConsumer.getAmqConsumer().getId()); + advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, amqConsumer.getId().toString()); + protocolManager.fireAdvisory(context, topic, advisoryMessage, amqConsumer.getId()); } catch (Exception e) { // TODO-NOW: LOGGING @@ -758,9 +789,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } public AMQSession addSession(SessionInfo ss, boolean internal) { - AMQSession amqSession = new AMQSession(getState().getInfo(), ss, protocolManager.getServer(), this, protocolManager.getScheduledPool(), protocolManager); + AMQSession amqSession = new AMQSession(getState().getInfo(), ss, server, this, protocolManager.getScheduledPool()); amqSession.initialize(); - amqSession.setInternal(internal); + + if (internal) { + amqSession.disableSecurity(); + } + sessions.put(ss.getSessionId(), amqSession); sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId()); return amqSession; @@ -780,10 +815,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public void removeDestination(ActiveMQDestination dest) throws Exception { if (dest.isQueue()) { SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName()); - protocolManager.getServer().destroyQueue(qName); + server.destroyQueue(qName); } else { - Bindings bindings = protocolManager.getServer().getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName())); + Bindings bindings = server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName())); Iterator iterator = bindings.getBindings().iterator(); while (iterator.hasNext()) { @@ -815,7 +850,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private void validateDestination(ActiveMQDestination destination) throws Exception { if (destination.isQueue()) { SimpleString physicalName = OpenWireUtil.toCoreAddress(destination); - BindingQueryResult result = protocolManager.getServer().bindingQuery(physicalName); + BindingQueryResult result = server.bindingQuery(physicalName); if (!result.isExists() && !result.isAutoCreateJmsQueues()) { throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName); } @@ -934,18 +969,70 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception { - protocolManager.removeSubscription(subInfo); + SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName())); + server.destroyQueue(subQueueName); + return null; } @Override public Response processRollbackTransaction(TransactionInfo info) throws Exception { - protocolManager.rollbackTransaction(info); - TransactionId txId = info.getTransactionId(); - txMap.remove(txId); + Transaction tx = lookupTX(info.getTransactionId(), null); + if (info.getTransactionId().isXATransaction() && tx == null) { + throw newXAException("Transaction '" + info.getTransactionId() + "' has not been started.", XAException.XAER_NOTA); + } + else if(tx != null) { + + AMQSession amqSession = (AMQSession)tx.getProtocolData(); + + if (amqSession != null) { + amqSession.getCoreSession().resetTX(tx); + + try { + returnReferences(tx, amqSession); + } + finally { + amqSession.getCoreSession().resetTX(null); + } + } + tx.rollback(); + } + + return null; } + /** Openwire will redeliver rolled back references. + * We need to return those here. */ + private void returnReferences(Transaction tx, AMQSession session) throws Exception { + if (session == null || session.isClosed()) { + return; + } + + RefsOperation oper = (RefsOperation) tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION); + + if (oper != null) { + List ackRefs = oper.getReferencesToAcknowledge(); + + for (ListIterator referenceIterator = ackRefs.listIterator(ackRefs.size()); referenceIterator.hasPrevious();){ + MessageReference ref = referenceIterator.previous(); + + Long consumerID = ref.getConsumerId(); + + ServerConsumer consumer = null; + if (consumerID != null) { + consumer = session.getCoreSession().locateConsumer(consumerID); + } + + if (consumer != null) { + System.out.println("Returning reference " + ref.getMessage()); + referenceIterator.remove(); + consumer.backToDelivering(ref); + } + } + } + } + @Override public Response processShutdown(ShutdownInfo info) throws Exception { OpenWireConnection.this.shutdown(false); @@ -989,44 +1076,137 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public Response processBeginTransaction(TransactionInfo info) throws Exception { - TransactionId txId = info.getTransactionId(); + final TransactionId txID = info.getTransactionId(); - if (!txMap.containsKey(txId)) { - txMap.put(txId, info); + try { + internalSession.resetTX(null); + if (txID.isXATransaction()) { + Xid xid = OpenWireUtil.toXID(txID); + internalSession.xaStart(xid); + } + else { + Transaction transaction = internalSession.newTransaction(); + txMap.put(txID, transaction); + transaction.addOperation(new TransactionOperationAbstract() { + @Override + public void afterCommit(Transaction tx) { + txMap.remove(txID); + } + }); + } + } + finally { + internalSession.resetTX(null); } return null; } @Override - public Response processBrokerInfo(BrokerInfo arg0) throws Exception { - throw new IllegalStateException("not implemented! "); + public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { + return processCommit(info, true); + } + + private Response processCommit(TransactionInfo info, boolean onePhase) throws Exception { + TransactionId txID = info.getTransactionId(); + + Transaction tx = lookupTX(txID, null); + + AMQSession session = (AMQSession)tx.getProtocolData(); + + tx.commit(onePhase); + + return null; } @Override - public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { - try { - protocolManager.commitTransactionOnePhase(info); - TransactionId txId = info.getTransactionId(); - txMap.remove(txId); + public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { + return processCommit(info, false); + } + + @Override + public Response processForgetTransaction(TransactionInfo info) throws Exception { + TransactionId txID = info.getTransactionId(); + + if (txID.isXATransaction()) { + try { + Xid xid = OpenWireUtil.toXID(info.getTransactionId()); + internalSession.xaForget(xid); + } + catch (Exception e) { + e.printStackTrace(); + throw e; + } } - catch (Exception e) { - e.printStackTrace(); - throw e; + else { + txMap.remove(txID); } return null; } + @Override - public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { - protocolManager.commitTransactionTwoPhase(info); - TransactionId txId = info.getTransactionId(); - txMap.remove(txId); + public Response processPrepareTransaction(TransactionInfo info) throws Exception { + TransactionId txID = info.getTransactionId(); + + try { + if (txID.isXATransaction()) { + try { + Xid xid = OpenWireUtil.toXID(info.getTransactionId()); + internalSession.xaPrepare(xid); + } + catch (Exception e) { + e.printStackTrace(); + throw e; + } + } + else { + Transaction tx = lookupTX(txID, null); + tx.prepare(); + } + } + finally { + internalSession.resetTX(null); + } + + return new IntegerResponse(XAResource.XA_RDONLY); + } + + + @Override + public Response processEndTransaction(TransactionInfo info) throws Exception { + TransactionId txID = info.getTransactionId(); + + if (txID.isXATransaction()) { + try { + Transaction tx = lookupTX(txID, null); + internalSession.resetTX(tx); + try { + Xid xid = OpenWireUtil.toXID(info.getTransactionId()); + internalSession.xaEnd(xid); + } + finally { + internalSession.resetTX(null); + } + } + catch (Exception e) { + e.printStackTrace(); + throw e; + } + } + else { + txMap.remove(info); + } return null; } @Override + public Response processBrokerInfo(BrokerInfo arg0) throws Exception { + throw new IllegalStateException("not implemented! "); + } + + @Override public Response processConnectionControl(ConnectionControl connectionControl) throws Exception { //activemq5 keeps a var to remember only the faultTolerant flag //this can be sent over a reconnected transport as the first command @@ -1058,31 +1238,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } @Override - public Response processEndTransaction(TransactionInfo info) throws Exception { - protocolManager.endTransaction(info); - TransactionId txId = info.getTransactionId(); - - if (!txMap.containsKey(txId)) { - txMap.put(txId, info); - } - return null; - } - - @Override public Response processFlush(FlushCommand arg0) throws Exception { throw new IllegalStateException("not implemented! "); } @Override - public Response processForgetTransaction(TransactionInfo info) throws Exception { - TransactionId txId = info.getTransactionId(); - txMap.remove(txId); - - protocolManager.forgetTransaction(info.getTransactionId()); - return null; - } - - @Override public Response processKeepAlive(KeepAliveInfo arg0) throws Exception { throw new IllegalStateException("not implemented! "); } @@ -1097,15 +1257,33 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se AMQSession session = getSession(producerId.getParentId()); - session.send(producerInfo, messageSend, sendProducerAck); + Transaction tx = lookupTX(messageSend.getTransactionId(), session); + + session.getCoreSession().resetTX(tx); + try { + session.send(producerInfo, messageSend, sendProducerAck); + } + finally { + session.getCoreSession().resetTX(null); + } + return null; } @Override public Response processMessageAck(MessageAck ack) throws Exception { - AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId()); - consumerBrokerExchange.acknowledge(ack); + AMQSession session = getSession(ack.getConsumerId().getParentId()); + Transaction tx = lookupTX(ack.getTransactionId(), session); + session.getCoreSession().resetTX(tx); + + try { + AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId()); + consumerBrokerExchange.acknowledge(ack); + } + finally { + session.getCoreSession().resetTX(null); + } return null; } @@ -1130,13 +1308,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } @Override - public Response processPrepareTransaction(TransactionInfo info) throws Exception { - protocolManager.prepareTransaction(info); - //activemq needs a rdonly response - return new IntegerResponse(XAResource.XA_RDONLY); - } - - @Override public Response processProducerAck(ProducerAck arg0) throws Exception { // a broker doesn't do producers.. this shouldn't happen return null; @@ -1186,15 +1357,45 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se ConsumerInfo info = consumerState.getInfo(); info.setLastDeliveredSequenceId(lastDeliveredSequenceId); - AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(id); + AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.remove(id); consumerBrokerExchange.removeConsumer(); - removeConsumerBrokerExchange(id); + return null; + } + + } + private Transaction lookupTX(TransactionId txID, AMQSession session) throws IllegalStateException { + if (txID == null) { return null; } + Xid xid = null; + Transaction transaction; + if (txID.isXATransaction()) { + xid = OpenWireUtil.toXID(txID); + transaction = server.getResourceManager().getTransaction(xid); + } + else { + transaction = txMap.get(txID); + } + + if (transaction == null) { + throw new IllegalStateException("cannot find transactionInfo::" + txID + " xid=" + xid); + } + + if (session != null && transaction.getProtocolData() != session) { + transaction.setProtocolData(session); + } + + return transaction; + } + + public static XAException newXAException(String s, int errorCode) { + XAException xaException = new XAException(s + " " + "xaErrorCode:" + errorCode); + xaException.errorCode = errorCode; + return xaException; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 89f71ed..b0a6d46 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; +import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index bbbb696..c87fbea 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -33,7 +33,6 @@ import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; -import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener; import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; @@ -101,10 +100,6 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl private String brokerName; - // Clebert: Artemis already has a Resource Manager. Need to remove this.. - // The TransactionID extends XATransactionID, so all we need is to convert the XID here - private Map transactions = new ConcurrentHashMap<>(); - private final Map topologyMap = new ConcurrentHashMap<>(); private final LinkedList members = new LinkedList<>(); @@ -140,7 +135,7 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl } public OpenWireFormat getNewWireFormat() { - return (OpenWireFormat)wireFactory.createWireFormat(); + return (OpenWireFormat) wireFactory.createWireFormat(); } @Override @@ -156,9 +151,7 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl } } - - public void removeConnection(ConnectionInfo info, - Throwable error) throws InvalidClientIDException { + public void removeConnection(ConnectionInfo info, Throwable error) throws InvalidClientIDException { synchronized (clientIdSet) { String clientId = info.getClientId(); if (clientId != null) { @@ -176,7 +169,6 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl } } - public ScheduledExecutorService getScheduledPool() { return scheduledPool; } @@ -223,7 +215,7 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl @Override public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) { OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat(); - OpenWireConnection owConn = new OpenWireConnection(connection, server.getExecutorFactory().getExecutor(), this, wf); + OpenWireConnection owConn = new OpenWireConnection(connection, server, server.getExecutorFactory().getExecutor(), this, wf); owConn.sendHandshake(); // TODO CLEBERT What is this constant here? we should get it from TTL initial pings @@ -323,7 +315,7 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl fireAdvisory(context, topic, copy); // init the conn - context.getConnection().addSessions( context.getConnectionState().getSessionIds()); + context.getConnection().addSessions(context.getConnectionState().getSessionIds()); } } @@ -343,9 +335,9 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl * See AdvisoryBroker.fireAdvisory() */ public void fireAdvisory(AMQConnectionContext context, - ActiveMQTopic topic, - Command command, - ConsumerId targetConsumerId) throws Exception { + ActiveMQTopic topic, + Command command, + ConsumerId targetConsumerId) throws Exception { ActiveMQMessage advisoryMessage = new ActiveMQMessage(); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName()); String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET"; @@ -448,55 +440,6 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl public boolean isStopping() { return false; } - public void endTransaction(TransactionInfo info) throws Exception { - AMQSession txSession = transactions.get(info.getTransactionId()); - - if (txSession != null) { - txSession.endTransaction(info); - } - } - - public void commitTransactionOnePhase(TransactionInfo info) throws Exception { - AMQSession txSession = transactions.get(info.getTransactionId()); - - if (txSession != null) { - txSession.commitOnePhase(info); - } - transactions.remove(info.getTransactionId()); - } - - public void prepareTransaction(TransactionInfo info) throws Exception { - XATransactionId xid = (XATransactionId) info.getTransactionId(); - AMQSession txSession = transactions.get(xid); - if (txSession != null) { - txSession.prepareTransaction(xid); - } - } - - public void commitTransactionTwoPhase(TransactionInfo info) throws Exception { - XATransactionId xid = (XATransactionId) info.getTransactionId(); - AMQSession txSession = transactions.get(xid); - if (txSession != null) { - txSession.commitTwoPhase(xid); - } - transactions.remove(xid); - } - - public void rollbackTransaction(TransactionInfo info) throws Exception { - AMQSession txSession = transactions.get(info.getTransactionId()); - if (txSession != null) { - txSession.rollback(info); - } - else if (info.getTransactionId().isLocalTransaction()) { - //during a broker restart, recovered local transaction may not be registered - //in that case we ignore and let the tx removed silently by connection. - //see AMQ1925Test.testAMQ1925_TXBegin - } - else { - throw newXAException("Transaction '" + info.getTransactionId() + "' has not been started.", XAException.XAER_NOTA); - } - transactions.remove(info.getTransactionId()); - } public boolean validateUser(String login, String passcode) { boolean validated = true; @@ -510,26 +453,6 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl return validated; } - public void forgetTransaction(TransactionId xid) throws Exception { - AMQSession txSession = transactions.get(xid); - if (txSession != null) { - txSession.forget(xid); - } - transactions.remove(xid); - } - - /** - * TODO: remove this, use the regular ResourceManager from the Server's - */ - public void registerTx(TransactionId txId, AMQSession amqSession) { - transactions.put(txId, amqSession); - } - - public void removeSubscription(RemoveSubscriptionInfo subInfo) throws Exception { - SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName())); - server.destroyQueue(subQueueName); - } - public void sendBrokerInfo(OpenWireConnection connection) throws Exception { BrokerInfo brokerInfo = new BrokerInfo(); brokerInfo.setBrokerName(getBrokerName()); @@ -543,14 +466,26 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl connection.dispatch(brokerInfo); } + /** + * URI property + */ + @SuppressWarnings("unused") public void setRebalanceClusterClients(boolean rebalance) { this.rebalanceClusterClients = rebalance; } + /** + * URI property + */ + @SuppressWarnings("unused") public boolean isRebalanceClusterClients() { return this.rebalanceClusterClients; } + /** + * URI property + */ + @SuppressWarnings("unused") public void setUpdateClusterClients(boolean updateClusterClients) { this.updateClusterClients = updateClusterClients; } @@ -559,10 +494,18 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl return this.updateClusterClients; } + /** + * URI property + */ + @SuppressWarnings("unused") public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) { this.updateClusterClientsOnRemove = updateClusterClientsOnRemove; } + /** + * URI property + */ + @SuppressWarnings("unused") public boolean isUpdateClusterClientsOnRemove() { return this.updateClusterClientsOnRemove; } @@ -571,10 +514,4 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl this.brokerName = name; } - public static XAException newXAException(String s, int errorCode) { - XAException xaException = new XAException(s + " " + "xaErrorCode:" + errorCode); - xaException.errorCode = errorCode; - return xaException; - } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java deleted file mode 100644 index 4513eb3..0000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.util.ByteSequence; - -public class OpenWireUtil { - - public static ActiveMQBuffer toActiveMQBuffer(ByteSequence bytes) { - ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bytes.length); - - buffer.writeBytes(bytes.data, bytes.offset, bytes.length); - return buffer; - } - - public static SimpleString toCoreAddress(ActiveMQDestination dest) { - if (dest.isQueue()) { - return new SimpleString("jms.queue." + dest.getPhysicalName()); - } - else { - return new SimpleString("jms.topic." + dest.getPhysicalName()); - } - } - - /** - * We convert the core address to an ActiveMQ Destination. We use the actual address on the message rather than the - * destination set on the consumer because it maybe different and the JMS spec says that it should be what ever was - * set on publish/send so a divert or wildcard may mean thats its different to the destination subscribed to by the - * consumer - */ - public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) { - String address = message.getAddress().toString(); - String strippedAddress = address.replace("jms.queue.", "").replace("jms.topic.", ""); - if (actualDestination.isQueue()) { - return new ActiveMQQueue(strippedAddress); - } - else { - return new ActiveMQTopic(strippedAddress); - } - } - - /* - *This util converts amq wildcards to compatible core wildcards - *The conversion is like this: - *AMQ * wildcard --> Core * wildcard (no conversion) - *AMQ > wildcard --> Core # wildcard - */ - public static String convertWildcard(String physicalName) { - return physicalName.replaceAll("(\\.>)+", ".#"); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java index 56b4b6d..5b9d72e 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java @@ -48,7 +48,7 @@ public class AMQCompositeConsumerBrokerExchange extends AMQConsumerBrokerExchang public void acknowledge(MessageAck ack) throws Exception { AMQConsumer amqConsumer = consumerMap.get(ack.getDestination()); if (amqConsumer != null) { - amqSession.acknowledge(ack, amqConsumer); + amqConsumer.acknowledge(ack); } } @@ -58,4 +58,11 @@ public class AMQCompositeConsumerBrokerExchange extends AMQConsumerBrokerExchang amqConsumer.removeConsumer(); } } + + @Override + public void updateConsumerPrefetchSize(int prefetch) { + for (AMQConsumer amqConsumer : consumerMap.values()) { + amqConsumer.setPrefetchSize(prefetch); + } + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index ef9b2a8..e65dbb8 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -17,10 +17,8 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq; import java.io.IOException; -import java.util.Iterator; -import java.util.Set; +import java.util.List; import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -30,11 +28,14 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; -import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil; +import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; @@ -42,7 +43,6 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessagePull; -import org.apache.activemq.command.TransactionId; import org.apache.activemq.wireformat.WireFormat; public class AMQConsumer { @@ -50,11 +50,10 @@ public class AMQConsumer { private org.apache.activemq.command.ActiveMQDestination openwireDestination; private ConsumerInfo info; private final ScheduledExecutorService scheduledPool; - private long nativeId = -1; + private ServerConsumer serverConsumer; private int prefetchSize; - private AtomicInteger windowAvailable; - private final java.util.Queue deliveringRefs = new ConcurrentLinkedQueue<>(); + private AtomicInteger currentWindow; private long messagePullSequence = 0; private MessagePullHandler messagePullHandler; @@ -67,20 +66,13 @@ public class AMQConsumer { this.info = info; this.scheduledPool = scheduledPool; this.prefetchSize = info.getPrefetchSize(); - this.windowAvailable = new AtomicInteger(prefetchSize); + this.currentWindow = new AtomicInteger(prefetchSize); if (prefetchSize == 0) { messagePullHandler = new MessagePullHandler(); } } public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception { - this.nativeId = nativeId; - AMQServerConsumer serverConsumer = createServerConsumer(info, slowConsumerDetectionListener); - serverConsumer.setAmqConsumer(this); - } - - - private AMQServerConsumer createServerConsumer(ConsumerInfo info, SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception { SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector()); @@ -93,13 +85,13 @@ public class AMQConsumer { SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), physicalName, info.getSubscriptionName(), selector, address); - AMQServerConsumer serverConsumer = (AMQServerConsumer) session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1); + serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1); serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); - return serverConsumer; } else { - SimpleString queueName = new SimpleString("jms.queue." + physicalName); - AMQServerConsumer serverConsumer = (AMQServerConsumer) session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1); + SimpleString queueName = OpenWireUtil.toCoreAddress(openwireDestination); + session.getCoreServer().getJMSQueueCreator().create(queueName); + serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1); serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString()); if (addrSettings != null) { @@ -113,10 +105,10 @@ public class AMQConsumer { } } - return serverConsumer; - } + serverConsumer.setProtocolData(this); + } private SimpleString createTopicSubscription(boolean isDurable, @@ -167,12 +159,6 @@ public class AMQConsumer { return queueName; } - - - public long getNativeId() { - return this.nativeId; - } - public ConsumerId getId() { return info.getConsumerId(); } @@ -182,16 +168,17 @@ public class AMQConsumer { } public void acquireCredit(int n) throws Exception { - boolean promptDelivery = windowAvailable.get() == 0; - if (windowAvailable.get() < prefetchSize) { - this.windowAvailable.addAndGet(n); - } + int oldwindow = currentWindow.getAndAdd(n); + + boolean promptDelivery = oldwindow < prefetchSize; + if (promptDelivery) { - session.getCoreSession().promptDelivery(nativeId); + serverConsumer.promptDelivery(); } + } - public int handleDeliver(ServerMessage message, int deliveryCount) { + public int handleDeliver(MessageReference reference, ServerMessage message, int deliveryCount) { MessageDispatch dispatch; try { if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) { @@ -200,9 +187,9 @@ public class AMQConsumer { //decrement deliveryCount as AMQ client tends to add 1. dispatch = OpenWireMessageConverter.createMessageDispatch(message, deliveryCount - 1, this); int size = dispatch.getMessage().getSize(); - this.deliveringRefs.add(new MessageInfo(dispatch.getMessage().getMessageId(), message.getMessageID(), size)); + reference.setProtocolData(dispatch.getMessage().getMessageId()); session.deliverMessage(dispatch); - windowAvailable.decrementAndGet(); + currentWindow.decrementAndGet(); return size; } catch (IOException e) { @@ -218,114 +205,59 @@ public class AMQConsumer { md.setConsumerId(getId()); md.setDestination(openwireDestination); session.deliverMessage(md); - windowAvailable.decrementAndGet(); } + /** The acknowledgement in openwire is done based on intervals. + * We will iterate through the list of delivering messages at {@link ServerConsumer#getDeliveringReferencesBasedOnProtocol(boolean, Object, Object)} + * and add those to the Transaction. + * Notice that we will start a new transaction on the cases where there is no transaction. */ public void acknowledge(MessageAck ack) throws Exception { + + MessageId first = ack.getFirstMessageId(); - MessageId lastm = ack.getLastMessageId(); - TransactionId tid = ack.getTransactionId(); - boolean isLocalTx = (tid != null) && tid.isLocalTransaction(); - boolean single = lastm.equals(first); - - MessageInfo mi = null; - int n = 0; - - if (ack.isIndividualAck()) { - Iterator iter = deliveringRefs.iterator(); - while (iter.hasNext()) { - mi = iter.next(); - if (mi.amqId.equals(lastm)) { - n++; - if (!isLocalTx) { - iter.remove(); - session.getCoreSession().individualAcknowledge(nativeId, mi.nativeId); - } - else { - mi.setLocalAcked(true); - } - if (tid == null) { - session.getCoreSession().commit(); - } - break; - } - } + MessageId last = ack.getLastMessageId(); + + if (first == null) { + first = last; } - else if (ack.isRedeliveredAck()) { - //client tells that this message is for redlivery. - //do nothing until poisoned. - n = ack.getMessageCount(); + + boolean removeReferences = !serverConsumer.isBrowseOnly(); // if it's browse only, nothing to be acked, we just remove the lists + + if (ack.isRedeliveredAck() || ack.isDeliveredAck() || ack.isExpiredAck()) { + removeReferences = false; } - else if (ack.isPoisonAck()) { - //send to dlq - Iterator iter = deliveringRefs.iterator(); - boolean firstFound = false; - while (iter.hasNext()) { - mi = iter.next(); - if (mi.amqId.equals(first)) { - n++; - iter.remove(); - session.getCoreSession().moveToDeadLetterAddress(nativeId, mi.nativeId, ack.getPoisonCause()); - session.getCoreSession().commit(); - if (single) { - break; - } - firstFound = true; - } - else if (firstFound || first == null) { - n++; - iter.remove(); - session.getCoreSession().moveToDeadLetterAddress(nativeId, mi.nativeId, ack.getPoisonCause()); - session.getCoreSession().commit(); - if (mi.amqId.equals(lastm)) { - break; - } - } + + List ackList = serverConsumer.getDeliveringReferencesBasedOnProtocol(removeReferences, first, last); + + acquireCredit(ack.getMessageCount()); + + if (removeReferences) { + + Transaction originalTX = session.getCoreSession().getCurrentTransaction(); + Transaction transaction; + + if (originalTX == null) { + transaction = session.getCoreSession().newTransaction(); } - } - else if (ack.isDeliveredAck() || ack.isExpiredAck()) { - //ToDo: implement with tests - n = ack.getMessageCount(); - } - else { - Iterator iter = deliveringRefs.iterator(); - boolean firstFound = false; - while (iter.hasNext()) { - MessageInfo ami = iter.next(); - if (ami.amqId.equals(first)) { - n++; - if (!isLocalTx) { - iter.remove(); - } - else { - ami.setLocalAcked(true); - } - if (single) { - mi = ami; - break; - } - firstFound = true; + else { + transaction = originalTX; + } + + if (ack.isIndividualAck() || ack.isStandardAck()) { + for (MessageReference ref : ackList) { + ref.acknowledge(transaction); } - else if (firstFound || first == null) { - n++; - if (!isLocalTx) { - iter.remove(); - } - else { - ami.setLocalAcked(true); - } - if (ami.amqId.equals(lastm)) { - mi = ami; - break; - } + } + else if (ack.isPoisonAck()) { + for (MessageReference ref : ackList) { + ref.getQueue().sendToDeadLetterAddress(transaction, ref); } } - if (mi != null && !isLocalTx) { - session.getCoreSession().acknowledge(nativeId, mi.nativeId); + + if (originalTX == null) { + transaction.commit(true); } } - - acquireCredit(n); } public void browseFinished() { @@ -337,61 +269,23 @@ public class AMQConsumer { session.deliverMessage(md); } - //this is called before session commit a local tx - public void finishTx() throws Exception { - MessageInfo lastMi = null; - - MessageInfo mi = null; - Iterator iter = deliveringRefs.iterator(); - while (iter.hasNext()) { - mi = iter.next(); - if (mi.isLocalAcked()) { - iter.remove(); - lastMi = mi; - } - } - - if (lastMi != null) { - session.getCoreSession().acknowledge(nativeId, lastMi.nativeId); - } - } - - public void rollbackTx(Set acked) throws Exception { - MessageInfo lastMi = null; - - MessageInfo mi = null; - Iterator iter = deliveringRefs.iterator(); - while (iter.hasNext()) { - mi = iter.next(); - if (mi.isLocalAcked()) { - acked.add(mi.nativeId); - lastMi = mi; - } - } - - if (lastMi != null) { - session.getCoreSession().acknowledge(nativeId, lastMi.nativeId); - } - } - public ConsumerInfo getInfo() { return info; } public boolean hasCredits() { - return windowAvailable.get() > 0; + return currentWindow.get() > 0; } public void processMessagePull(MessagePull messagePull) throws Exception { - windowAvailable.incrementAndGet(); - + // windowAvailable.incrementAndGet(); if (messagePullHandler != null) { messagePullHandler.nextSequence(messagePullSequence++, messagePull.getTimeout()); } } public void removeConsumer() throws Exception { - session.removeConsumer(nativeId); + serverConsumer.close(false); } public org.apache.activemq.command.ActiveMQDestination getOpenwireDestination() { @@ -400,10 +294,10 @@ public class AMQConsumer { public void setPrefetchSize(int prefetchSize) { this.prefetchSize = prefetchSize; - this.windowAvailable.set(prefetchSize); + this.currentWindow.set(prefetchSize); this.info.setPrefetchSize(prefetchSize); if (this.prefetchSize > 0) { - session.getCoreSession().promptDelivery(nativeId); + serverConsumer.promptDelivery(); } } @@ -421,7 +315,7 @@ public class AMQConsumer { this.next = next; this.timeout = timeout; latch = new CountDownLatch(1); - session.getCoreSession().forceConsumerDelivery(nativeId, messagePullSequence); + serverConsumer.forceDelivery(messagePullSequence); //if we are 0 timeout or less we need to wait to get either the forced message or a real message. if (timeout <= 0) { latch.await(10, TimeUnit.SECONDS); @@ -434,7 +328,6 @@ public class AMQConsumer { public boolean checkForcedConsumer(ServerMessage message) { if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) { - System.out.println("MessagePullHandler.checkForcedConsumer"); if (next >= 0) { if (timeout <= 0) { latch.countDown(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java index 21a45b1..0132465 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java @@ -32,4 +32,6 @@ public abstract class AMQConsumerBrokerExchange { public abstract void processMessagePull(MessagePull messagePull) throws Exception; public abstract void removeConsumer() throws Exception; + + public abstract void updateConsumerPrefetchSize(int prefetch); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java deleted file mode 100644 index 2f9d0bc..0000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire.amq; - -import java.util.List; - -import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.persistence.StorageManager; -import org.apache.activemq.artemis.core.postoffice.QueueBinding; -import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.QueueImpl; -import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; -import org.apache.activemq.artemis.core.server.management.ManagementService; -import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; - -public class AMQServerConsumer extends ServerConsumerImpl { - - // TODO-NOW: remove this once unified - AMQConsumer amqConsumer; - - public AMQConsumer getAmqConsumer() { - return amqConsumer; - } - - /** TODO-NOW: remove this once unified */ - public void setAmqConsumer(AMQConsumer amqConsumer) { - this.amqConsumer = amqConsumer; - } - - public AMQServerConsumer(long consumerID, - AMQServerSession serverSession, - QueueBinding binding, - Filter filter, - boolean started, - boolean browseOnly, - StorageManager storageManager, - SessionCallback callback, - boolean preAcknowledge, - boolean strictUpdateDeliveryCount, - ManagementService managementService, - boolean supportLargeMessage, - Integer credits, - final ActiveMQServer server) throws Exception { - super(consumerID, serverSession, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server); - } - - public void amqPutBackToDeliveringList(final List refs) { - synchronized (this.deliveringRefs) { - for (MessageReference ref : refs) { - ref.incrementDeliveryCount(); - deliveringRefs.add(ref); - } - //adjust the order. Suppose deliveringRefs has 2 existing - //refs m1, m2, and refs has 3 m3, m4, m5 - //new order must be m3, m4, m5, m1, m2 - if (refs.size() > 0) { - long first = refs.get(0).getMessage().getMessageID(); - MessageReference m = deliveringRefs.peek(); - while (m.getMessage().getMessageID() != first) { - deliveringRefs.poll(); - deliveringRefs.add(m); - m = deliveringRefs.peek(); - } - } - } - } - - public void moveToDeadLetterAddress(long mid, Throwable cause) throws Exception { - MessageReference ref = removeReferenceByID(mid); - - if (ref == null) { - throw new IllegalStateException("Cannot find ref to ack " + mid); - } - - ServerMessage coreMsg = ref.getMessage(); - coreMsg.putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, cause.toString()); - - QueueImpl queue = (QueueImpl) ref.getQueue(); - synchronized (queue) { - queue.sendToDeadLetterAddress(ref); - queue.decDelivering(); - } - } - -}