Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5EA6911365 for ; Mon, 9 Jun 2014 11:36:43 +0000 (UTC) Received: (qmail 66863 invoked by uid 500); 9 Jun 2014 11:36:43 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 66820 invoked by uid 500); 9 Jun 2014 11:36:43 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 66810 invoked by uid 99); 9 Jun 2014 11:36:43 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Jun 2014 11:36:43 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id EFB018B2960; Mon, 9 Jun 2014 11:36:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gtully@apache.org To: commits@activemq.apache.org Date: Mon, 09 Jun 2014 11:36:43 -0000 Message-Id: In-Reply-To: <94c76c09456d49608d2c97232511b1a9@git.apache.org> References: <94c76c09456d49608d2c97232511b1a9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: https://issues.apache.org/jira/browse/AMQ-5212 - ensure uncontented access to concurrent destination map avoides deadlock, rerework https://issues.apache.org/jira/browse/AMQ-4952 to differenciate duplicates from send and recovered messa https://issues.apache.org/jira/browse/AMQ-5212 - ensure uncontented access to concurrent destination map avoides deadlock, rerework https://issues.apache.org/jira/browse/AMQ-4952 to differenciate duplicates from send and recovered messages from the store. https://issues.apache.org/jira/browse/AMQ-3454 benefits from getDestinationMap(destination) to get direct access to the map to determine existance. Additional test Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/27b3a7c3 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/27b3a7c3 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/27b3a7c3 Branch: refs/heads/trunk Commit: 27b3a7c3440e3e04fae1a1eb766ed13733c27949 Parents: 785faa0 Author: gtully Authored: Mon Jun 9 12:31:39 2014 +0100 Committer: gtully Committed: Mon Jun 9 12:32:33 2014 +0100 ---------------------------------------------------------------------- .../java/org/apache/activemq/broker/Broker.java | 8 + .../apache/activemq/broker/BrokerFilter.java | 5 + .../org/apache/activemq/broker/EmptyBroker.java | 5 + .../org/apache/activemq/broker/ErrorBroker.java | 5 + .../activemq/broker/MutableBrokerFilter.java | 5 + .../activemq/broker/region/AbstractRegion.java | 7 +- .../activemq/broker/region/RegionBroker.java | 9 + .../region/cursors/AbstractStoreCursor.java | 13 +- .../activemq/security/AuthorizationBroker.java | 2 +- .../activemq/store/jdbc/JDBCMessageStore.java | 7 +- .../activemq/store/kahadb/KahaDBStore.java | 5 +- .../org/apache/activemq/bugs/AMQ5212Test.java | 204 +++++++++++++++++++ 12 files changed, 258 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/27b3a7c3/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java index c2fd132..5d052e9 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java @@ -17,6 +17,7 @@ package org.apache.activemq.broker; import java.net.URI; +import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; @@ -144,6 +145,13 @@ public interface Broker extends Region, Service { ActiveMQDestination[] getDestinations() throws Exception; /** + * return a reference destination map of a region based on the destination type + * @param destination + * @return + */ + public Map getDestinationMap(ActiveMQDestination destination); + + /** * Gets a list of all the prepared xa transactions. * * @param context transaction ids http://git-wip-us.apache.org/repos/asf/activemq/blob/27b3a7c3/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java index b1d3c18..132b46d 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -73,6 +73,11 @@ public class BrokerFilter implements Broker { } @Override + public Map getDestinationMap(ActiveMQDestination destination) { + return next.getDestinationMap(destination); + } + + @Override public Set getDestinations(ActiveMQDestination destination) { return next.getDestinations(destination); } http://git-wip-us.apache.org/repos/asf/activemq/blob/27b3a7c3/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java index 2d2e6ba..8185554 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -78,6 +78,11 @@ public class EmptyBroker implements Broker { } @Override + public Map getDestinationMap(ActiveMQDestination destination) { + return Collections.EMPTY_MAP; + } + + @Override public Set getDestinations(ActiveMQDestination destination) { return Collections.EMPTY_SET; } http://git-wip-us.apache.org/repos/asf/activemq/blob/27b3a7c3/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java index f692d8a..ae42141 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -67,6 +67,11 @@ public class ErrorBroker implements Broker { } @Override + public Map getDestinationMap(ActiveMQDestination destination) { + return Collections.EMPTY_MAP; + } + + @Override public Set getDestinations(ActiveMQDestination destination) { return Collections.EMPTY_SET; } http://git-wip-us.apache.org/repos/asf/activemq/blob/27b3a7c3/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index 112378a..2eea2e8 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -83,6 +83,11 @@ public class MutableBrokerFilter implements Broker { } @Override + public Map getDestinationMap(ActiveMQDestination destination) { + return getNext().getDestinationMap(destination); + } + + @Override public Set getDestinations(ActiveMQDestination destination) { return getNext().getDestinations(destination); } http://git-wip-us.apache.org/repos/asf/activemq/blob/27b3a7c3/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index e443d53..53e8cdd 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -249,12 +249,7 @@ public abstract class AbstractRegion implements Region { } public Map getDestinationMap() { - destinationsLock.readLock().lock(); - try{ - return destinations; - } finally { - destinationsLock.readLock().unlock(); - } + return destinations; } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/activemq/blob/27b3a7c3/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index da6e1fa..59b1b92 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -135,6 +135,15 @@ public class RegionBroker extends EmptyBroker { } @Override + public Map getDestinationMap(ActiveMQDestination destination) { + try { + return getRegion(destination).getDestinationMap(); + } catch (JMSException jmse) { + return Collections.emptyMap(); + } + } + + @Override public Set getDestinations(ActiveMQDestination destination) { try { return getRegion(destination).getDestinations(destination); http://git-wip-us.apache.org/repos/asf/activemq/blob/27b3a7c3/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index d0b1a39..b6f9b7e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -100,8 +100,15 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i recovered = true; storeHasMessages = true; } else { - LOG.warn("{} - cursor got duplicate: {}, {}", new Object[]{ this, message.getMessageId(), message.getPriority() }); - duplicate(message); + if (LOG.isDebugEnabled()) { + LOG.debug(this + " - cursor got duplicate: " + message.getMessageId() + "," + message.getPriority() + ", cached=" + cached, new Throwable("duplicate message detected")); + } else { + LOG.warn("{} - cursor got duplicate {}", regionDestination.getActiveMQDestination(), message.getMessageId()); + } + if (!cached || message.getMessageId().getEntryLocator() != null) { + // came from the store or was added to the jdbc store + duplicate(message); + } } return recovered; } @@ -195,8 +202,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i lastCachedId = node.getMessageId(); lastTx = node.getMessage().getTransactionId(); } else { - LOG.debug(this + " duplicate add {}", node.getMessage(), new Throwable("duplicated detected")); dealWithDuplicates(); + return; } } } else { http://git-wip-us.apache.org/repos/asf/activemq/blob/27b3a7c3/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java b/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java index db482ba..39d3c59 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java @@ -76,7 +76,7 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB } protected boolean checkDestinationAdmin(SecurityContext securityContext, ActiveMQDestination destination) { - Destination existing = this.getDestinationMap().get(destination); + Destination existing = this.getDestinationMap(destination).get(destination); if (existing != null) { return true; } http://git-wip-us.apache.org/repos/asf/activemq/blob/27b3a7c3/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index 3c441b0..968b928 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -131,11 +131,8 @@ public class JDBCMessageStore extends AbstractMessageStore { } finally { c.close(); } - if (context != null && context.getXid() != null) { - message.getMessageId().setEntryLocator(sequenceId); - } else { - onAdd(messageId, sequenceId, message.getPriority()); - } + message.getMessageId().setEntryLocator(sequenceId); + onAdd(messageId, sequenceId, message.getPriority()); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/27b3a7c3/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 74425f1..60c0738 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -281,8 +281,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { if (brokerService != null) { RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); if (regionBroker != null) { - Set destinationSet = regionBroker.getDestinations(convert(commandDestination)); - for (Destination destination : destinationSet) { + ActiveMQDestination activeMQDestination = convert(commandDestination); + Destination destination = regionBroker.getDestinationMap(activeMQDestination).get(activeMQDestination); + if (destination != null) { destination.getDestinationStatistics().getMessages().decrement(); destination.getDestinationStatistics().getEnqueues().decrement(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/27b3a7c3/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java new file mode 100644 index 0000000..64d57a5 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQMessageProducer; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class AMQ5212Test { + + BrokerService brokerService; + + @Before + public void setUp() throws Exception { + start(true); + } + + public void start(boolean deleteAllMessages) throws Exception { + brokerService = new BrokerService(); + if (deleteAllMessages) { + brokerService.deleteAllMessages(); + } + brokerService.addConnector("tcp://localhost:0"); + brokerService.setAdvisorySupport(false); + brokerService.start(); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + } + + @Test + public void verifyDuplicateSuppressionWithConsumer() throws Exception { + doVerifyDuplicateSuppression(100, 100, true); + } + + @Test + public void verifyDuplicateSuppression() throws Exception { + doVerifyDuplicateSuppression(100, 100, false); + } + + public void doVerifyDuplicateSuppression(final int numToSend, final int expectedTotalEnqueue, final boolean demand) throws Exception { + final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString()); + connectionFactory.setCopyMessageOnSend(false); + connectionFactory.setWatchTopicAdvisories(false); + + final int concurrency = 40; + final AtomicInteger workCount = new AtomicInteger(numToSend); + ExecutorService executorService = Executors.newFixedThreadPool(concurrency); + for (int i = 0; i < concurrency; i++) { + executorService.execute(new Runnable() { + @Override + public void run() { + try { + int i; + while ((i = workCount.getAndDecrement()) > 0) { + ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection(); + activeMQConnection.start(); + ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + ActiveMQQueue dest = new ActiveMQQueue("queue-" + i + "-" + + AMQ5212Test.class.getSimpleName()); + ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest); + if (demand) { + // create demand so page in will happen + activeMQSession.createConsumer(dest); + } + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setDestination(dest); + activeMQMessageProducer.send(message, null); + + // send a duplicate + activeMQConnection.syncSendPacket(message); + activeMQConnection.close(); + + } + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + TimeUnit.SECONDS.sleep(1); + executorService.shutdown(); + executorService.awaitTermination(5, TimeUnit.MINUTES); + + assertEquals("total enqueue as expected", expectedTotalEnqueue, brokerService.getAdminView().getTotalEnqueueCount()); + } + + @Test + public void verifyConsumptionOnDuplicate() throws Exception { + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString()); + connectionFactory.setCopyMessageOnSend(false); + connectionFactory.setWatchTopicAdvisories(false); + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection(); + activeMQConnection.start(); + ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQQueue dest = new ActiveMQQueue("Q"); + ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setDestination(dest); + activeMQMessageProducer.send(message, null); + + // send a duplicate + activeMQConnection.syncSendPacket(message); + + activeMQConnection.close(); + + // verify original can be consumed after restart + brokerService.stop(); + brokerService.start(false); + + connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString()); + connectionFactory.setCopyMessageOnSend(false); + connectionFactory.setWatchTopicAdvisories(false); + + activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection(); + activeMQConnection.start(); + activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer messageConsumer = activeMQSession.createConsumer(dest); + Message received = messageConsumer.receive(4000); + assertNotNull("Got message", received); + assertEquals("match", message.getJMSMessageID(), received.getJMSMessageID()); + + activeMQConnection.close(); + } + + @Test + public void verifyClientAckConsumptionOnDuplicate() throws Exception { + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString()); + connectionFactory.setCopyMessageOnSend(false); + connectionFactory.setWatchTopicAdvisories(false); + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection(); + activeMQConnection.start(); + ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + ActiveMQQueue dest = new ActiveMQQueue("Q"); + + MessageConsumer messageConsumer = activeMQSession.createConsumer(dest); + + ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setDestination(dest); + activeMQMessageProducer.send(message, null); + + // send a duplicate + activeMQConnection.syncSendPacket(message); + + + Message received = messageConsumer.receive(4000); + assertNotNull("Got message", received); + assertEquals("match", message.getJMSMessageID(), received.getJMSMessageID()); + messageConsumer.close(); + + + messageConsumer = activeMQSession.createConsumer(dest); + received = messageConsumer.receive(4000); + assertNotNull("Got message", received); + assertEquals("match", message.getJMSMessageID(), received.getJMSMessageID()); + received.acknowledge(); + + activeMQConnection.close(); + } +}