Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E237518D2B for ; Fri, 1 Apr 2016 02:20:17 +0000 (UTC) Received: (qmail 55994 invoked by uid 500); 1 Apr 2016 02:20:17 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 55872 invoked by uid 500); 1 Apr 2016 02:20:17 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 55758 invoked by uid 99); 1 Apr 2016 02:20:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Apr 2016 02:20:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 814EBDFF81; Fri, 1 Apr 2016 02:20:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Fri, 01 Apr 2016 02:20:19 -0000 Message-Id: <9e2d865ae9bf4cd1b86a6f4f1f07b9ef@git.apache.org> In-Reply-To: <3e55bfe96b804ed3b315f0e1a0a90fc3@git.apache.org> References: <3e55bfe96b804ed3b315f0e1a0a90fc3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] activemq-artemis git commit: major refactoring on Transactions and AMQ objects http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java deleted file mode 100644 index 3f0259d..0000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java +++ /dev/null @@ -1,391 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire.amq; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.activemq.artemis.api.core.Pair; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.management.CoreNotificationType; -import org.apache.activemq.artemis.api.core.management.ManagementHelper; -import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.filter.impl.FilterImpl; -import org.apache.activemq.artemis.core.persistence.OperationContext; -import org.apache.activemq.artemis.core.persistence.StorageManager; -import org.apache.activemq.artemis.core.postoffice.Binding; -import org.apache.activemq.artemis.core.postoffice.BindingType; -import org.apache.activemq.artemis.core.postoffice.PostOffice; -import org.apache.activemq.artemis.core.postoffice.QueueBinding; -import org.apache.activemq.artemis.core.protocol.openwire.AMQTransactionImpl; -import org.apache.activemq.artemis.core.security.SecurityStore; -import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; -import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.QueueCreator; -import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; -import org.apache.activemq.artemis.core.server.impl.RefsOperation; -import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; -import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; -import org.apache.activemq.artemis.core.server.management.ManagementService; -import org.apache.activemq.artemis.core.server.management.Notification; -import org.apache.activemq.artemis.core.transaction.ResourceManager; -import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; -import org.apache.activemq.artemis.utils.TypedProperties; -import org.apache.activemq.artemis.utils.UUID; - -public class AMQServerSession extends ServerSessionImpl { - - private boolean internal; - - public AMQServerSession(String name, - String username, - String password, - int minLargeMessageSize, - boolean autoCommitSends, - boolean autoCommitAcks, - boolean preAcknowledge, - boolean persistDeliveryCountBeforeDelivery, - boolean xa, - RemotingConnection connection, - StorageManager storageManager, - PostOffice postOffice, - ResourceManager resourceManager, - SecurityStore securityStore, - ManagementService managementService, - ActiveMQServerImpl activeMQServerImpl, - SimpleString managementAddress, - SimpleString simpleString, - SessionCallback callback, - QueueCreator queueCreator, - OperationContext context) throws Exception { - super(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, persistDeliveryCountBeforeDelivery, xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, activeMQServerImpl, managementAddress, simpleString, callback, context, new AMQTransactionFactory(), queueCreator); - } - - @Override - protected void doClose(final boolean failed) throws Exception { - Set consumersClone = new HashSet<>(consumers.values()); - for (ServerConsumer consumer : consumersClone) { - AMQServerConsumer amqConsumer = (AMQServerConsumer)consumer; - amqConsumer.setStarted(false); - } - - synchronized (this) { - if (tx != null && tx.getXid() == null) { - ((AMQTransactionImpl) tx).setRollbackForClose(); - } - } - super.doClose(failed); - } - - public AtomicInteger getConsumerCredits(final long consumerID) { - ServerConsumer consumer = consumers.get(consumerID); - - if (consumer == null) { - ActiveMQServerLogger.LOGGER.debug("There is no consumer with id " + consumerID); - - return null; - } - - return ((ServerConsumerImpl) consumer).getAvailableCredits(); - } - - public void enableXA() throws Exception { - if (!this.xa) { - if (this.tx != null) { - //that's not expected, maybe a warning. - this.tx.rollback(); - this.tx = null; - } - - this.autoCommitAcks = false; - this.autoCommitSends = false; - - this.xa = true; - } - } - - public void enableTx() throws Exception { - if (this.xa) { - throw new IllegalStateException("Session is XA"); - } - - this.autoCommitAcks = false; - this.autoCommitSends = false; - - if (this.tx != null) { - //that's not expected, maybe a warning. - this.tx.rollback(); - this.tx = null; - } - - this.tx = newTransaction(); - } - - //amq specific behavior - - // TODO: move this to AMQSession - public void amqRollback(Set acked) throws Exception { - if (tx == null) { - // Might be null if XA - - tx = newTransaction(); - } - - RefsOperation oper = (RefsOperation) tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION); - - if (oper != null) { - List ackRefs = oper.getReferencesToAcknowledge(); - Map> toAcks = new HashMap<>(); - for (MessageReference ref : ackRefs) { - Long consumerId = ref.getConsumerId(); - - if (this.consumers.containsKey(consumerId)) { - if (acked.contains(ref.getMessage().getMessageID())) { - List ackList = toAcks.get(consumerId); - if (ackList == null) { - ackList = new ArrayList<>(); - toAcks.put(consumerId, ackList); - } - ackList.add(ref); - } - } - else { - //consumer must have been closed, cancel to queue - ref.getQueue().cancel(tx, ref); - } - } - //iterate consumers - if (toAcks.size() > 0) { - Iterator>> iter = toAcks.entrySet().iterator(); - while (iter.hasNext()) { - Entry> entry = iter.next(); - ServerConsumer consumer = consumers.get(entry.getKey()); - ((AMQServerConsumer) consumer).amqPutBackToDeliveringList(entry.getValue()); - } - } - } - - tx.rollback(); - - if (xa) { - tx = null; - } - else { - tx = newTransaction(); - } - - } - - /** - * The failed flag is used here to control delivery count. - * If set to true the delivery count won't decrement. - */ - public void amqCloseConsumer(long consumerID, boolean failed) throws Exception { - final ServerConsumer consumer = consumers.get(consumerID); - - if (consumer != null) { - consumer.close(failed); - } - else { - ActiveMQServerLogger.LOGGER.cannotFindConsumer(consumerID); - } - } - - @Override - public ServerConsumer createConsumer(final long consumerID, - final SimpleString queueName, - final SimpleString filterString, - final boolean browseOnly, - final boolean supportLargeMessage, - final Integer credits) throws Exception { - if (this.internal) { - // Clebert TODO: PQP!!!!!!!!!!!!!!!!!!!! - - //internal sessions doesn't check security:: Why??? //// what's the reason for that? Where a link? - - Binding binding = postOffice.getBinding(queueName); - - if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE) { - throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName); - } - - Filter filter = FilterImpl.createFilter(filterString); - - ServerConsumer consumer = newConsumer(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits); - consumers.put(consumer.getID(), consumer); - - if (!browseOnly) { - TypedProperties props = new TypedProperties(); - - props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress()); - - props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName()); - - props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName()); - - props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance()); - - Queue theQueue = (Queue) binding.getBindable(); - - props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount()); - - // HORNETQ-946 - props.putSimpleStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(username)); - - props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, SimpleString.toSimpleString(this.remotingConnection.getRemoteAddress())); - - props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(name)); - - if (filterString != null) { - props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString); - } - - Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CREATED, props); - - if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQServerLogger.LOGGER.debug("Session with user=" + username + ", connection=" + this.remotingConnection + " created a consumer on queue " + queueName + ", filter = " + filterString); - } - - managementService.sendNotification(notification); - } - - return consumer; - } - else { - return super.createConsumer(consumerID, queueName, filterString, browseOnly, supportLargeMessage, credits); - } - } - - @Override - public Queue createQueue(final SimpleString address, - final SimpleString name, - final SimpleString filterString, - final boolean temporary, - final boolean durable) throws Exception { - if (!this.internal) { - return super.createQueue(address, name, filterString, temporary, durable); - } - - Queue queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary); - - if (temporary) { - // Temporary queue in core simply means the queue will be deleted if - // the remoting connection - // dies. It does not mean it will get deleted automatically when the - // session is closed. - // It is up to the user to delete the queue when finished with it - - TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(server, name); - - remotingConnection.addCloseListener(cleaner); - remotingConnection.addFailureListener(cleaner); - - tempQueueCleannerUppers.put(name, cleaner); - } - - if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQServerLogger.LOGGER.debug("Queue " + name + " created on address " + name + - " with filter=" + filterString + " temporary = " + - temporary + " durable=" + durable + " on session user=" + this.username + ", connection=" + this.remotingConnection); - } - - return queue; - } - - - // Clebert TODO: Get rid of these mthods - @Override - protected void doSend(final ServerMessage msg, final boolean direct) throws Exception { - if (!this.internal) { - super.doSend(msg, direct); - return; - } - - //bypass security check for internal sessions - if (tx == null || autoCommitSends) { - } - else { - routingContext.setTransaction(tx); - } - - try { - postOffice.route(msg, getQueueCreator(), routingContext, direct); - - Pair value = targetAddressInfos.get(msg.getAddress()); - - if (value == null) { - targetAddressInfos.put(msg.getAddress(), new Pair<>(msg.getUserID(), new AtomicLong(1))); - } - else { - value.setA(msg.getUserID()); - value.getB().incrementAndGet(); - } - } - finally { - routingContext.clear(); - } - } - - @Override - protected ServerConsumer newConsumer(long consumerID, - ServerSessionImpl serverSessionImpl, - QueueBinding binding, - Filter filter, - boolean started2, - boolean browseOnly, - StorageManager storageManager2, - SessionCallback callback2, - boolean preAcknowledge2, - boolean strictUpdateDeliveryCount2, - ManagementService managementService2, - boolean supportLargeMessage, - Integer credits) throws Exception { - return new AMQServerConsumer(consumerID, this, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, this.server); - } - - public AMQServerConsumer getConsumer(long nativeId) { - return (AMQServerConsumer) this.consumers.get(nativeId); - } - - public void setInternal(boolean internal) { - this.internal = internal; - } - - public boolean isInternal() { - return this.internal; - } - - public void moveToDeadLetterAddress(long consumerId, long mid, Throwable cause) throws Exception { - AMQServerConsumer consumer = getConsumer(consumerId); - consumer.moveToDeadLetterAddress(mid, cause); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java deleted file mode 100644 index a6ca4a0..0000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire.amq; - -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.persistence.OperationContext; -import org.apache.activemq.artemis.core.persistence.StorageManager; -import org.apache.activemq.artemis.core.postoffice.PostOffice; -import org.apache.activemq.artemis.core.security.SecurityStore; -import org.apache.activemq.artemis.core.server.QueueCreator; -import org.apache.activemq.artemis.core.server.ServerSessionFactory; -import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; -import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; -import org.apache.activemq.artemis.core.server.management.ManagementService; -import org.apache.activemq.artemis.core.transaction.ResourceManager; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; - -public class AMQServerSessionFactory implements ServerSessionFactory { - - private static final AMQServerSessionFactory singleInstance = new AMQServerSessionFactory(); - - public static AMQServerSessionFactory getInstance() { - return singleInstance; - } - - private AMQServerSessionFactory() { - } - - @Override - public ServerSessionImpl createCoreSession(String name, - String username, - String password, - int minLargeMessageSize, - boolean autoCommitSends, - boolean autoCommitAcks, - boolean preAcknowledge, - boolean persistDeliveryCountBeforeDelivery, - boolean xa, - RemotingConnection connection, - StorageManager storageManager, - PostOffice postOffice, - ResourceManager resourceManager, - SecurityStore securityStore, - ManagementService managementService, - ActiveMQServerImpl activeMQServerImpl, - SimpleString managementAddress, - SimpleString simpleString, - SessionCallback callback, - QueueCreator queueCreator, - OperationContext context) throws Exception { - return new AMQServerSession(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, persistDeliveryCountBeforeDelivery, xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, activeMQServerImpl, managementAddress, simpleString, callback, queueCreator, context); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 4675dca..b71f480 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -18,12 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq; import javax.jms.ResourceAllocationException; import javax.transaction.xa.Xid; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -32,15 +27,15 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; -import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager; -import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil; +import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; -import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; @@ -48,16 +43,13 @@ import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; -import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.XATransactionId; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.wireformat.WireFormat; @@ -65,26 +57,18 @@ import org.apache.activemq.wireformat.WireFormat; public class AMQSession implements SessionCallback { // ConsumerID is generated inside the session, 0, 1, 2, ... as many consumers as you have on the session - protected final IDGenerator idGenerator = new SimpleIDGenerator(0); + protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0); private ConnectionInfo connInfo; - private AMQServerSession coreSession; + private ServerSession coreSession; private SessionInfo sessInfo; private ActiveMQServer server; private OpenWireConnection connection; - private Map consumers = new ConcurrentHashMap<>(); - private AtomicBoolean started = new AtomicBoolean(false); - private TransactionId txId = null; - - private boolean isTx; - private final ScheduledExecutorService scheduledPool; - private OpenWireProtocolManager manager; - // The sessionWireformat used by the session // this object is meant to be used per thread / session // so we make a new one per AMQSession @@ -94,20 +78,22 @@ public class AMQSession implements SessionCallback { SessionInfo sessInfo, ActiveMQServer server, OpenWireConnection connection, - ScheduledExecutorService scheduledPool, - OpenWireProtocolManager manager) { + ScheduledExecutorService scheduledPool) { this.connInfo = connInfo; this.sessInfo = sessInfo; this.server = server; this.connection = connection; this.scheduledPool = scheduledPool; - this.manager = manager; OpenWireFormat marshaller = (OpenWireFormat) connection.getMarshaller(); this.converter = new OpenWireMessageConverter(marshaller.copy()); } + public boolean isClosed() { + return coreSession.isClosed(); + } + public OpenWireMessageConverter getConverter() { return converter; } @@ -122,7 +108,7 @@ public class AMQSession implements SessionCallback { // now try { - coreSession = (AMQServerSession) server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, AMQServerSessionFactory.getInstance(), true); + coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true); long sessionId = sessInfo.getSessionId().getValue(); if (sessionId == -1) { @@ -136,8 +122,8 @@ public class AMQSession implements SessionCallback { } public List createConsumer(ConsumerInfo info, - AMQSession amqSession, - SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception { + AMQSession amqSession, + SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception { //check destination ActiveMQDestination dest = info.getDestination(); ActiveMQDestination[] dests = null; @@ -147,7 +133,7 @@ public class AMQSession implements SessionCallback { else { dests = new ActiveMQDestination[]{dest}; } -// Map consumerMap = new HashMap<>(); + List consumersList = new java.util.LinkedList<>(); for (ActiveMQDestination openWireDest : dests) { @@ -157,9 +143,9 @@ public class AMQSession implements SessionCallback { } AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool); - consumer.init(slowConsumerDetectionListener, idGenerator.generateID()); + long nativeID = consumerIDGenerator.generateID(); + consumer.init(slowConsumerDetectionListener, nativeID); consumersList.add(consumer); - consumers.put(consumer.getNativeId(), consumer); } return consumersList; @@ -180,7 +166,7 @@ public class AMQSession implements SessionCallback { @Override public void browserFinished(ServerConsumer consumer) { - AMQConsumer theConsumer = ((AMQServerConsumer) consumer).getAmqConsumer(); + AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData(); if (theConsumer != null) { theConsumer.browseFinished(); } @@ -204,13 +190,20 @@ public class AMQSession implements SessionCallback { } @Override - public int sendMessage(ServerMessage message, ServerConsumer consumerID, int deliveryCount) { - AMQConsumer consumer = consumers.get(consumerID.getID()); - return consumer.handleDeliver(message, deliveryCount); + public int sendMessage(MessageReference reference, + ServerMessage message, + ServerConsumer consumer, + int deliveryCount) { + AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData(); + return theConsumer.handleDeliver(reference, message, deliveryCount); } @Override - public int sendLargeMessage(ServerMessage message, ServerConsumer consumerID, long bodySize, int deliveryCount) { + public int sendLargeMessage(MessageReference reference, + ServerMessage message, + ServerConsumer consumerID, + long bodySize, + int deliveryCount) { // TODO Auto-generated method stub return 0; } @@ -231,16 +224,15 @@ public class AMQSession implements SessionCallback { } @Override - public boolean hasCredits(ServerConsumer consumerID) { + public boolean hasCredits(ServerConsumer consumer) { - AMQConsumer amqConsumer; + AMQConsumer amqConsumer = null; - amqConsumer = consumers.get(consumerID.getID()); - - if (amqConsumer != null) { - return amqConsumer.hasCredits(); + if (consumer.getProtocolData() != null) { + amqConsumer = (AMQConsumer) consumer.getProtocolData(); } - return false; + + return amqConsumer != null && amqConsumer.hasCredits(); } @Override @@ -252,11 +244,6 @@ public class AMQSession implements SessionCallback { public void send(final ProducerInfo producerInfo, final Message messageSend, boolean sendProducerAck) throws Exception { - TransactionId tid = messageSend.getTransactionId(); - if (tid != null) { - resetSessionTx(tid); - } - messageSend.setBrokerInTime(System.currentTimeMillis()); ActiveMQDestination destination = messageSend.getDestination(); @@ -376,7 +363,7 @@ public class AMQSession implements SessionCallback { } } - public AMQServerSession getCoreSession() { + public ServerSession getCoreSession() { return this.coreSession; } @@ -384,134 +371,10 @@ public class AMQSession implements SessionCallback { return this.server; } - public void removeConsumer(long consumerId) throws Exception { - boolean failed = !(this.txId != null || this.isTx); - - coreSession.amqCloseConsumer(consumerId, failed); - consumers.remove(consumerId); - } - public WireFormat getMarshaller() { return this.connection.getMarshaller(); } - public void acknowledge(MessageAck ack, AMQConsumer consumer) throws Exception { - TransactionId tid = ack.getTransactionId(); - if (tid != null) { - this.resetSessionTx(ack.getTransactionId()); - } - consumer.acknowledge(ack); - - if (tid == null && ack.getAckType() == MessageAck.STANDARD_ACK_TYPE) { - this.coreSession.commit(); - } - } - - //AMQ session and transactions are create separately. Whether a session - //is transactional or not is known only when a TransactionInfo command - //comes in. - public void resetSessionTx(TransactionId xid) throws Exception { - if ((this.txId != null) && (!this.txId.equals(xid))) { - throw new IllegalStateException("Session already associated with a tx"); - } - - this.isTx = true; - if (this.txId == null) { - //now reset session - this.txId = xid; - - if (xid.isXATransaction()) { - XATransactionId xaXid = (XATransactionId) xid; - coreSession.enableXA(); - XidImpl coreXid = new XidImpl(xaXid.getBranchQualifier(), xaXid.getFormatId(), xaXid.getGlobalTransactionId()); - coreSession.xaStart(coreXid); - } - else { - coreSession.enableTx(); - } - - this.manager.registerTx(this.txId, this); - } - } - - private void checkTx(TransactionId inId) { - if (this.txId == null) { - throw new IllegalStateException("Session has no transaction associated with it"); - } - - if (!this.txId.equals(inId)) { - throw new IllegalStateException("Session already associated with another tx"); - } - - this.isTx = true; - } - - public void endTransaction(TransactionInfo info) throws Exception { - checkTx(info.getTransactionId()); - - if (txId.isXATransaction()) { - XATransactionId xid = (XATransactionId) txId; - XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId()); - this.coreSession.xaEnd(coreXid); - } - } - - public void commitOnePhase(TransactionInfo info) throws Exception { - checkTx(info.getTransactionId()); - - if (txId.isXATransaction()) { - XATransactionId xid = (XATransactionId) txId; - XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId()); - this.coreSession.xaCommit(coreXid, true); - } - else { - Iterator iter = consumers.values().iterator(); - while (iter.hasNext()) { - AMQConsumer consumer = iter.next(); - consumer.finishTx(); - } - this.coreSession.commit(); - } - - this.txId = null; - } - - public void prepareTransaction(XATransactionId xid) throws Exception { - checkTx(xid); - XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId()); - this.coreSession.xaPrepare(coreXid); - } - - public void commitTwoPhase(XATransactionId xid) throws Exception { - checkTx(xid); - XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId()); - this.coreSession.xaCommit(coreXid, false); - - this.txId = null; - } - - public void rollback(TransactionInfo info) throws Exception { - checkTx(info.getTransactionId()); - if (this.txId.isXATransaction()) { - XATransactionId xid = (XATransactionId) txId; - XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId()); - this.coreSession.xaRollback(coreXid); - } - else { - Iterator iter = consumers.values().iterator(); - Set acked = new HashSet<>(); - while (iter.hasNext()) { - AMQConsumer consumer = iter.next(); - consumer.rollbackTx(acked); - } - //on local rollback, amq broker doesn't do anything about the delivered - //messages, which stay at clients until next time - this.coreSession.amqRollback(acked); - } - - this.txId = null; - } - public void recover(List recovered) { List xids = this.coreSession.xaGetInDoubtXids(); for (Xid xid : xids) { @@ -520,24 +383,12 @@ public class AMQSession implements SessionCallback { } } - public void forget(final TransactionId tid) throws Exception { - checkTx(tid); - XATransactionId xid = (XATransactionId) tid; - XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId()); - this.coreSession.xaForget(coreXid); - this.txId = null; - } - public ConnectionInfo getConnectionInfo() { return this.connInfo; } - public void setInternal(boolean internal) { - this.coreSession.setInternal(internal); - } - - public boolean isInternal() { - return this.coreSession.isInternal(); + public void disableSecurity() { + this.coreSession.disableSecurity(); } public void deliverMessage(MessageDispatch dispatch) { @@ -548,20 +399,6 @@ public class AMQSession implements SessionCallback { this.coreSession.close(false); } - public AMQConsumer getConsumer(Long coreConsumerId) { - return consumers.get(coreConsumerId); - } - - public void updateConsumerPrefetchSize(ConsumerId consumerId, int prefetch) { - Iterator iterator = consumers.values().iterator(); - while (iterator.hasNext()) { - AMQConsumer consumer = iterator.next(); - if (consumer.getId().equals(consumerId)) { - consumer.setPrefetchSize(prefetch); - } - } - } - public OpenWireConnection getConnection() { return connection; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java index b29c448..e02638e 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java @@ -40,6 +40,12 @@ public class AMQSingleConsumerBrokerExchange extends AMQConsumerBrokerExchange { @Override public void acknowledge(MessageAck ack) throws Exception { - amqSession.acknowledge(ack, consumer); + consumer.acknowledge(ack); } + + @Override + public void updateConsumerPrefetchSize(int prefetch) { + consumer.setPrefetchSize(prefetch); + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransactionFactory.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransactionFactory.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransactionFactory.java deleted file mode 100644 index 3a47333..0000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransactionFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire.amq; - -import javax.transaction.xa.Xid; - -import org.apache.activemq.artemis.core.persistence.StorageManager; -import org.apache.activemq.artemis.core.protocol.openwire.AMQTransactionImpl; -import org.apache.activemq.artemis.core.transaction.Transaction; -import org.apache.activemq.artemis.core.transaction.TransactionFactory; - -public class AMQTransactionFactory implements TransactionFactory { - - @Override - public Transaction newTransaction(Xid xid, StorageManager storageManager, int timeoutSeconds) { - return new AMQTransactionImpl(xid, storageManager, timeoutSeconds); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/MessageInfo.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/MessageInfo.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/MessageInfo.java deleted file mode 100644 index 005dd2e..0000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/MessageInfo.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire.amq; - -import org.apache.activemq.command.MessageId; - -public class MessageInfo { - - public MessageId amqId; - public long nativeId; - public int size; - //mark message that is acked within a local tx - public boolean localAcked; - - public MessageInfo(MessageId amqId, long nativeId, int size) { - this.amqId = amqId; - this.nativeId = nativeId; - this.size = size; - } - - @Override - public String toString() { - return "native mid: " + this.nativeId + " amqId: " + amqId + " local acked: " + localAcked; - } - - public void setLocalAcked(boolean ack) { - localAcked = ack; - } - - public boolean isLocalAcked() { - return localAcked; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java new file mode 100644 index 0000000..1c64676 --- /dev/null +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.openwire.util; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.transaction.impl.XidImpl; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.util.ByteSequence; + +public class OpenWireUtil { + + public static ActiveMQBuffer toActiveMQBuffer(ByteSequence bytes) { + ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bytes.length); + + buffer.writeBytes(bytes.data, bytes.offset, bytes.length); + return buffer; + } + + public static SimpleString toCoreAddress(ActiveMQDestination dest) { + if (dest.isQueue()) { + return new SimpleString("jms.queue." + dest.getPhysicalName()); + } + else { + return new SimpleString("jms.topic." + dest.getPhysicalName()); + } + } + + /** + * We convert the core address to an ActiveMQ Destination. We use the actual address on the message rather than the + * destination set on the consumer because it maybe different and the JMS spec says that it should be what ever was + * set on publish/send so a divert or wildcard may mean thats its different to the destination subscribed to by the + * consumer + */ + public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) { + String address = message.getAddress().toString(); + String strippedAddress = address.replace("jms.queue.", "").replace("jms.topic.", ""); + if (actualDestination.isQueue()) { + return new ActiveMQQueue(strippedAddress); + } + else { + return new ActiveMQTopic(strippedAddress); + } + } + + /* + *This util converts amq wildcards to compatible core wildcards + *The conversion is like this: + *AMQ * wildcard --> Core * wildcard (no conversion) + *AMQ > wildcard --> Core # wildcard + */ + public static String convertWildcard(String physicalName) { + return physicalName.replaceAll("(\\.>)+", ".#"); + } + + public static XidImpl toXID(TransactionId xaXid) { + return toXID((XATransactionId)xaXid); + } + + public static XidImpl toXID(XATransactionId xaXid) { + return new XidImpl(xaXid.getBranchQualifier(), xaXid.getFormatId(), xaXid.getGlobalTransactionId()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 0cad259..b3e52af 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -226,7 +226,7 @@ class StompProtocolManager implements ProtocolManager { if (stompSession == null) { stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor())); String name = UUIDGenerator.getInstance().generateStringUUID(); - ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, null, true); + ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true); stompSession.setServerSession(session); sessions.put(connection.getID(), stompSession); } @@ -239,7 +239,7 @@ class StompProtocolManager implements ProtocolManager { if (stompSession == null) { stompSession = new StompSession(connection, this, server.getStorageManager().newContext(executor)); String name = UUIDGenerator.getInstance().generateStringUUID(); - ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, null, true); + ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true); stompSession.setServerSession(session); transactedSessions.put(txID, stompSession); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index a6cbe71..9b5c70d 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; @@ -118,7 +119,7 @@ public class StompSession implements SessionCallback { } @Override - public int sendMessage(ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) { + public int sendMessage(MessageReference ref, ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) { LargeServerMessageImpl largeMessage = null; ServerMessage newServerMessage = serverMessage; try { @@ -207,7 +208,7 @@ public class StompSession implements SessionCallback { } @Override - public int sendLargeMessage(ServerMessage msg, ServerConsumer consumer, long bodySize, int deliveryCount) { + public int sendLargeMessage(MessageReference ref, ServerMessage msg, ServerConsumer consumer, long bodySize, int deliveryCount) { return 0; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index 82b0e92..99e9160 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.transaction.Transaction; public class PagedReferenceImpl implements PagedReference { @@ -48,6 +49,18 @@ public class PagedReferenceImpl implements PagedReference { private boolean alreadyAcked; + private Object protocolData; + + @Override + public Object getProtocolData() { + return protocolData; + } + + @Override + public void setProtocolData(Object protocolData) { + this.protocolData = protocolData; + } + @Override public ServerMessage getMessage() { return getPagedMessage().getMessage(); @@ -199,9 +212,19 @@ public class PagedReferenceImpl implements PagedReference { subscription.ack(this); } + @Override + public void acknowledge(Transaction tx) throws Exception { + if (tx == null) { + getQueue().acknowledge(this); + } + else { + getQueue().acknowledge(tx, this); + } + } + /* (non-Javadoc) - * @see java.lang.Object#toString() - */ + * @see java.lang.Object#toString() + */ @Override public String toString() { String msgToString; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java index 2de5adb..0e5cd2f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java @@ -149,7 +149,7 @@ public class ActiveMQPacketHandler implements ChannelHandler { } ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), - new CoreSessionCallback(request.getName(), protocolManager, channel, connection), null, true); + new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true); ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel); channel.setHandler(handler); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java index c05a288..9d6125b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; @@ -56,7 +57,7 @@ public final class CoreSessionCallback implements SessionCallback { } @Override - public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { + public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { Packet packet = new SessionReceiveLargeMessage(consumer.getID(), message, bodySize, deliveryCount); channel.send(packet); @@ -79,7 +80,7 @@ public final class CoreSessionCallback implements SessionCallback { } @Override - public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) { + public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) { Packet packet = new SessionReceiveMessage(consumer.getID(), message, deliveryCount); int size = 0; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 64633bb..b47df20 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -139,7 +139,6 @@ public interface ActiveMQServer extends ActiveMQComponent { boolean xa, String defaultAddress, SessionCallback callback, - ServerSessionFactory sessionFactory, boolean autoCreateQueues) throws Exception; SecurityStore getSecurityStore(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java index 0ff55ac..b1e0dde 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.server; +import org.apache.activemq.artemis.core.transaction.Transaction; + /** * A reference to a message. * @@ -35,6 +37,14 @@ public interface MessageReference { */ int getMessageMemoryEstimate(); + /** To be used on holding protocol specific data during the delivery. + * This will be only valid while the message is on the delivering queue at the consumer */ + Object getProtocolData(); + + /** To be used on holding protocol specific data during the delivery. + * This will be only valid while the message is on the delivering queue at the consumer */ + void setProtocolData(Object data); + MessageReference copy(Queue queue); /** @@ -61,6 +71,8 @@ public interface MessageReference { void acknowledge() throws Exception; + void acknowledge(Transaction tx) throws Exception; + void setConsumerId(Long consumerID); Long getConsumerId(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index c92325a..ec9d4a3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -150,6 +150,8 @@ public interface Queue extends Bindable { int sendMessagesToDeadLetterAddress(Filter filter) throws Exception; + void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception; + boolean changeReferencePriority(long messageID, byte newPriority) throws Exception; int changeReferencesPriority(Filter filter, byte newPriority) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java index d75efdd..d157a8c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java @@ -31,6 +31,12 @@ public interface ServerConsumer extends Consumer { void fireSlowConsumer(); + /** this is to be used with anything specific on a protocol head. */ + Object getProtocolData(); + + /** this is to be used with anything specific on a protocol head. */ + void setProtocolData(Object protocolData); + /** * @param protocolContext * @see #getProtocolContext() @@ -68,6 +74,12 @@ public interface ServerConsumer extends Consumer { MessageReference removeReferenceByID(long messageID) throws Exception; + /** Some protocols may choose to send the message back to delivering instead of redeliver. + * For example openwire will redeliver through the client, so messages will go back to delivering list after rollback. */ + void backToDelivering(MessageReference reference); + + List getDeliveringReferencesBasedOnProtocol(boolean remove, Object protocolDataStart, Object protocolDataEnd); + void acknowledge(Transaction tx, long messageID) throws Exception; void individualAcknowledge(Transaction tx, long messageID) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index 62bb3b5..b7a7c47 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -36,9 +36,19 @@ public interface ServerSession extends SecurityAuth { Object getConnectionID(); + /** + * Certain protocols may create an internal session that shouldn't go through security checks. + * make sure you don't expose this property through any protocol layer as that would be a security breach + */ + void enableSecurity(); + + void disableSecurity(); + @Override RemotingConnection getRemotingConnection(); + Transaction newTransaction(); + boolean removeConsumer(long consumerID) throws Exception; void acknowledge(long consumerID, long messageID) throws Exception; @@ -87,6 +97,11 @@ public interface ServerSession extends SecurityAuth { void stop(); + /** + * To be used by protocol heads that needs to control the transaction outside the session context. + */ + void resetTX(Transaction transaction); + Queue createQueue(SimpleString address, SimpleString name, SimpleString filterString, @@ -100,6 +115,13 @@ public interface ServerSession extends SecurityAuth { SimpleString filterString, boolean browseOnly) throws Exception; + ServerConsumer createConsumer(final long consumerID, + final SimpleString queueName, + final SimpleString filterString, + final boolean browseOnly, + final boolean supportLargeMessage, + final Integer credits) throws Exception; + QueueQueryResult executeQueueQuery(SimpleString name) throws Exception; BindingQueryResult executeBindingQuery(SimpleString address) throws Exception; @@ -151,6 +173,10 @@ public interface ServerSession extends SecurityAuth { Transaction getCurrentTransaction(); + ServerConsumer locateConsumer(long consumerID) throws Exception; + + boolean isClosed(); + void createSharedQueue(SimpleString address, SimpleString name, boolean durable, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 13a1283..7ce7270 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1091,7 +1091,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean xa, final String defaultAddress, final SessionCallback callback, - final ServerSessionFactory sessionFactory, final boolean autoCreateQueues) throws Exception { if (securityStore != null) { @@ -1105,7 +1104,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { checkSessionLimit(username); final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor()); - final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, sessionFactory, autoCreateQueues); + final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues); sessions.put(name, session); @@ -1178,14 +1177,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { String defaultAddress, SessionCallback callback, OperationContext context, - ServerSessionFactory sessionFactory, boolean autoCreateJMSQueues) throws Exception { - if (sessionFactory == null) { - return new ServerSessionImpl(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, autoCreateJMSQueues ? jmsQueueCreator : null); - } - else { - return sessionFactory.createCoreSession(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, jmsQueueCreator, context); - } + return new ServerSessionImpl(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, autoCreateJMSQueues ? jmsQueueCreator : null); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 9feb60e..932c260 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.transaction.Transaction; /** * A queue that will discard messages if a newer message with the same @@ -188,6 +189,16 @@ public class LastValueQueue extends QueueImpl { } @Override + public Object getProtocolData() { + return ref.getProtocolData(); + } + + @Override + public void setProtocolData(Object data) { + ref.setProtocolData(data); + } + + @Override public void setAlreadyAcked() { ref.setAlreadyAcked(); } @@ -247,6 +258,11 @@ public class LastValueQueue extends QueueImpl { } @Override + public void acknowledge(Transaction tx) throws Exception { + ref.acknowledge(tx); + } + + @Override public void setPersistedCount(int count) { ref.setPersistedCount(count); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java index fd04b6d..de4d5ae 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.MemorySize; /** @@ -42,6 +43,8 @@ public class MessageReferenceImpl implements MessageReference { private boolean alreadyAcked; + private Object protocolData; + // Static -------------------------------------------------------- private static final int memoryOffset; @@ -86,6 +89,16 @@ public class MessageReferenceImpl implements MessageReference { // MessageReference implementation ------------------------------- + @Override + public Object getProtocolData() { + return protocolData; + } + + @Override + public void setProtocolData(Object protocolData) { + this.protocolData = protocolData; + } + /** * @return the persistedCount */ @@ -174,7 +187,16 @@ public class MessageReferenceImpl implements MessageReference { @Override public void acknowledge() throws Exception { - queue.acknowledge(this); + this.acknowledge(null); + } + + public void acknowledge(Transaction tx) throws Exception { + if (tx == null) { + getQueue().acknowledge(this); + } + else { + getQueue().acknowledge(tx, this); + } } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 86ca36c..75f0f98 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1074,7 +1074,7 @@ public class QueueImpl implements Queue { if (isTrace) { ActiveMQServerLogger.LOGGER.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName()); } - move(expiryAddress, ref, true, false); + move(null, expiryAddress, ref, true, false); } else { if (isTrace) { @@ -1461,7 +1461,7 @@ public class QueueImpl implements Queue { MessageReference ref = iter.next(); if (ref.getMessage().getMessageID() == messageID) { incDelivering(); - sendToDeadLetterAddress(ref); + sendToDeadLetterAddress(null, ref); iter.remove(); refRemoved(ref); return true; @@ -1480,7 +1480,7 @@ public class QueueImpl implements Queue { MessageReference ref = iter.next(); if (filter == null || filter.match(ref.getMessage())) { incDelivering(); - sendToDeadLetterAddress(ref); + sendToDeadLetterAddress(null, ref); iter.remove(); refRemoved(ref); count++; @@ -1507,7 +1507,7 @@ public class QueueImpl implements Queue { refRemoved(ref); incDelivering(); try { - move(toAddress, ref, false, rejectDuplicate); + move(null, toAddress, ref, false, rejectDuplicate); } catch (Exception e) { decDelivering(); @@ -2120,7 +2120,7 @@ public class QueueImpl implements Queue { if (isTrace) { ActiveMQServerLogger.LOGGER.trace("Sending reference " + reference + " to DLA = " + addressSettings.getDeadLetterAddress() + " since ref.getDeliveryCount=" + reference.getDeliveryCount() + "and maxDeliveries=" + maxDeliveries + " from queue=" + this.getName()); } - sendToDeadLetterAddress(reference, addressSettings.getDeadLetterAddress()); + sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress()); return false; } @@ -2337,36 +2337,45 @@ public class QueueImpl implements Queue { } } - public void sendToDeadLetterAddress(final MessageReference ref) throws Exception { - sendToDeadLetterAddress(ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress()); + public void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception { + sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress()); } - private void sendToDeadLetterAddress(final MessageReference ref, + private void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref, final SimpleString deadLetterAddress) throws Exception { if (deadLetterAddress != null) { Bindings bindingList = postOffice.getBindingsForAddress(deadLetterAddress); if (bindingList.getBindings().isEmpty()) { ActiveMQServerLogger.LOGGER.messageExceededMaxDelivery(ref, deadLetterAddress); - acknowledge(ref); + ref.acknowledge(tx); } else { ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name); - move(deadLetterAddress, ref, false, false); + move(tx, deadLetterAddress, ref, false, false); } } else { ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(name); - acknowledge(ref); + ref.acknowledge(tx); } } - private void move(final SimpleString address, + private void move(final Transaction originalTX, + final SimpleString address, final MessageReference ref, final boolean expiry, final boolean rejectDuplicate) throws Exception { - Transaction tx = new TransactionImpl(storageManager); + Transaction tx; + + if (originalTX != null) { + tx = originalTX; + } + else { + // if no TX we create a new one to commit at the end + tx = new TransactionImpl(storageManager); + } ServerMessage copyMessage = makeCopy(ref, expiry); @@ -2376,7 +2385,9 @@ public class QueueImpl implements Queue { acknowledge(tx, ref); - tx.commit(); + if (originalTX == null) { + tx.commit(); + } } /* http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 14f22ed..deeab74 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -20,6 +20,7 @@ import java.math.BigDecimal; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -83,10 +84,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private final ServerSession session; - private final Object lock = new Object(); + protected final Object lock = new Object(); private final boolean supportLargeMessage; + private Object protocolData; + private Object protocolContext; private final ActiveMQServer server; @@ -123,7 +126,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private final StorageManager storageManager; - protected final java.util.Queue deliveringRefs = new ConcurrentLinkedQueue<>(); + protected final java.util.Deque deliveringRefs = new ConcurrentLinkedDeque<>(); private final SessionCallback callback; @@ -231,6 +234,16 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { // ---------------------------------------------------------------------- @Override + public Object getProtocolData() { + return protocolData; + } + + @Override + public void setProtocolData(Object protocolData) { + this.protocolData = protocolData; + } + + @Override public void setlowConsumerDetection(SlowConsumerDetectionListener listener) { this.slowConsumerListener = listener; } @@ -524,7 +537,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence); forcedDeliveryMessage.setAddress(messageQueue.getName()); - callback.sendMessage(forcedDeliveryMessage, ServerConsumerImpl.this, 0); + callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0); } } } @@ -560,7 +573,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { if (!deliveringRefs.isEmpty()) { for (MessageReference ref : deliveringRefs) { if (performACK) { - ackReference(tx, ref); + ref.acknowledge(tx); performACK = false; } @@ -713,6 +726,44 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { return messageQueue; } + + /** Remove references based on the protocolData. + * there will be an interval defined between protocolDataStart and protocolDataEnd. + * This method will fetch the delivering references, remove them from the delivering list and return a list. + * + * This will be useful for other protocols that will need this such as openWire or MQTT. */ + public List getDeliveringReferencesBasedOnProtocol(boolean remove, Object protocolDataStart, Object protocolDataEnd) { + LinkedList retReferences = new LinkedList<>(); + boolean hit = false; + synchronized (lock) { + Iterator referenceIterator = deliveringRefs.iterator(); + + while (referenceIterator.hasNext()) { + MessageReference reference = referenceIterator.next(); + + if (!hit) { + hit = reference.getProtocolData() != null && reference.getProtocolData().equals(protocolDataStart); + } + + // notice: this is not an else clause, this is also valid for the first hit + if (hit) { + if (remove) { + referenceIterator.remove(); + } + retReferences.add(reference); + + // Whenever this is met we interrupt the loop + // even on the first hit + if (reference.getProtocolData() != null && reference.getProtocolData().equals(protocolDataEnd)) { + break; + } + } + } + } + + return retReferences; + } + @Override public void acknowledge(Transaction tx, final long messageID) throws Exception { if (browseOnly) { @@ -750,7 +801,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { throw ils; } - ackReference(tx, ref); + ref.acknowledge(tx); + acks++; } while (ref.getMessage().getMessageID() != messageID); @@ -780,15 +832,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } } - private void ackReference(Transaction tx, MessageReference ref) throws Exception { - if (tx == null) { - ref.getQueue().acknowledge(ref); - } - else { - ref.getQueue().acknowledge(tx, ref); - } - } - @Override public void individualAcknowledge(Transaction tx, final long messageID) throws Exception { @@ -818,7 +861,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { throw ils; } - ackReference(tx, ref); + ref.acknowledge(tx); if (startedTransaction) { tx.commit(); @@ -866,6 +909,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { ref.getQueue().cancel(ref, System.currentTimeMillis()); } + + @Override + public void backToDelivering(MessageReference reference) { + deliveringRefs.addFirst(reference); + } + @Override public MessageReference removeReferenceByID(final long messageID) throws Exception { if (browseOnly) { @@ -965,7 +1014,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { * @param message */ private void deliverStandardMessage(final MessageReference ref, final ServerMessage message) { - int packetSize = callback.sendMessage(message, ServerConsumerImpl.this, ref.getDeliveryCount()); + int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount()); if (availableCredits != null) { availableCredits.addAndGet(-packetSize); @@ -1057,7 +1106,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { sentInitialPacket = true; - int packetSize = callback.sendLargeMessage(currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount()); + int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount()); if (availableCredits != null) { availableCredits.addAndGet(-packetSize);