Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 35D3499AE for ; Fri, 3 Feb 2012 13:44:05 +0000 (UTC) Received: (qmail 14000 invoked by uid 500); 3 Feb 2012 13:44:05 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 13928 invoked by uid 500); 3 Feb 2012 13:44:04 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 13921 invoked by uid 99); 3 Feb 2012 13:44:03 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Feb 2012 13:44:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Feb 2012 13:43:59 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 052572388A9B for ; Fri, 3 Feb 2012 13:43:38 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1240162 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/jdbc/adapter/ test/java/org/apache/activemq/broker/ft/ test/java/org/apache... Date: Fri, 03 Feb 2012 13:43:37 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120203134338.052572388A9B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Fri Feb 3 13:43:36 2012 New Revision: 1240162 URL: http://svn.apache.org/viewvc?rev=1240162&view=rev Log: https://issues.apache.org/jira/browse/AMQ-3695: Failover using a JDBC Message Store and Virtual Topic can result in a lost message if queue is empty. Problem is that an empty destination is not recorded, as there is no entry in the messages table. Fix is to make use of the ack table, in the same way a for durable subs. For destinations that match the virtual topic filter, an entry out of priority range is added to the ack table. the startup destination query now unions over the ack and messages table Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1240162&r1=1240161&r2=1240162&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Fri Feb 3 13:43:36 2012 @@ -23,6 +23,7 @@ import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -74,7 +75,9 @@ import org.apache.activemq.broker.region import org.apache.activemq.broker.region.virtual.VirtualTopic; import org.apache.activemq.broker.scheduler.SchedulerBroker; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.BrokerId; +import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.network.ConnectionFilter; import org.apache.activemq.network.DiscoveryNetworkConnector; import org.apache.activemq.network.NetworkConnector; @@ -204,6 +207,7 @@ public class BrokerService implements Se private int offlineDurableSubscriberTimeout = -1; private int offlineDurableSubscriberTaskSchedule = 300000; + private DestinationFilter virtualConsumerDestinationFilter; static { String localHostName = "localhost"; @@ -2130,6 +2134,9 @@ public class BrokerService implements Se getBroker().addDestination(adminConnectionContext, destination,true); } } + if (isUseVirtualTopics()) { + startVirtualConsumerDestinations(); + } } /** @@ -2297,28 +2304,40 @@ public class BrokerService implements Se } } - /** - * Starts all destiantions in persistence store. This includes all inactive - * destinations - */ - protected void startDestinationsInPersistenceStore(Broker broker) throws Exception { - Set destinations = destinationFactory.getDestinations(); - if (destinations != null) { - Iterator iter = destinations.iterator(); - ConnectionContext adminConnectionContext = broker.getAdminConnectionContext(); - if (adminConnectionContext == null) { - ConnectionContext context = new ConnectionContext(); - context.setBroker(broker); - adminConnectionContext = context; - broker.setAdminConnectionContext(adminConnectionContext); - } - while (iter.hasNext()) { - ActiveMQDestination destination = (ActiveMQDestination) iter.next(); - broker.addDestination(adminConnectionContext, destination,false); + protected void startVirtualConsumerDestinations() throws Exception { + ConnectionContext adminConnectionContext = getAdminConnectionContext(); + Set destinations = destinationFactory.getDestinations(); + DestinationFilter filter = getVirtualTopicConsumerDestinationFilter(); + if (!destinations.isEmpty()) { + for (ActiveMQDestination destination : destinations) { + if (filter.matches(destination) == true) { + broker.addDestination(adminConnectionContext, destination, false); + } } } } + private DestinationFilter getVirtualTopicConsumerDestinationFilter() { + // created at startup, so no sync needed + if (virtualConsumerDestinationFilter == null) { + Set consumerDestinations = new HashSet(); + for (DestinationInterceptor interceptor : destinationInterceptors) { + if (interceptor instanceof VirtualDestinationInterceptor) { + VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor; + for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) { + if (virtualDestination instanceof VirtualTopic) { + consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT)); + } + } + } + } + ActiveMQQueue filter = new ActiveMQQueue(); + filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{})); + virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter); + } + return virtualConsumerDestinationFilter; + } + protected synchronized ThreadPoolExecutor getExecutor() { if (this.executor == null) { this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { @@ -2568,4 +2587,9 @@ public class BrokerService implements Se public void setOfflineDurableSubscriberTaskSchedule(int offlineDurableSubscriberTaskSchedule) { this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule; } + + public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) { + return isUseVirtualTopics() && destination.isQueue() && + getVirtualTopicConsumerDestinationFilter().matches(destination); + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=1240162&r1=1240161&r2=1240162&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Fri Feb 3 13:43:36 2012 @@ -97,4 +97,6 @@ public interface JDBCAdapter { public int getMaxRows(); public void setMaxRows(int maxRows); + + void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=1240162&r1=1240161&r2=1240162&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Fri Feb 3 13:43:36 2012 @@ -68,14 +68,33 @@ public class JDBCMessageStore extends Ab protected ActiveMQMessageAudit audit; - public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) { + public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException { super(destination); this.persistenceAdapter = persistenceAdapter; this.adapter = adapter; this.wireFormat = wireFormat; this.audit = audit; + + if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination)) { + recordDestinationCreation(destination); + } } - + + private void recordDestinationCreation(ActiveMQDestination destination) throws IOException { + TransactionContext c = persistenceAdapter.getTransactionContext(); + try { + c = persistenceAdapter.getTransactionContext(); + if (adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, destination.getQualifiedName(), destination.getQualifiedName()) < 0) { + adapter.doRecordDestination(c, destination); + } + } catch (SQLException e) { + JDBCPersistenceAdapter.log("JDBC Failure: ", e); + throw IOExceptionSupport.create("Failed to record destination: " + destination + ". Reason: " + e, e); + } finally { + c.close(); + } + } + public void addMessage(ConnectionContext context, Message message) throws IOException { MessageId messageId = message.getMessageId(); if (audit != null && audit.isDuplicate(message)) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1240162&r1=1240161&r2=1240162&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Fri Feb 3 13:43:36 2012 @@ -112,7 +112,6 @@ public class JDBCPersistenceAdapter exte } public Set getDestinations() { - // Get a connection and insert the message into the DB. TransactionContext c = null; try { c = getTransactionContext(); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1240162&r1=1240161&r2=1240162&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Fri Feb 3 13:43:36 2012 @@ -60,7 +60,7 @@ public class JDBCTopicMessageStore exten }; - public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) { + public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) throws IOException { super(persistenceAdapter, adapter, wireFormat, topic, audit); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=1240162&r1=1240161&r2=1240162&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Fri Feb 3 13:43:36 2012 @@ -356,7 +356,8 @@ public class Statements { public String getFindAllDestinationsStatement() { if (findAllDestinationsStatement == null) { - findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName(); + findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName() + + " UNION DISTINCT SELECT DISTINCT CONTAINER FROM " + getFullAckTableName(); } return findAllDestinationsStatement; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1240162&r1=1240161&r2=1240162&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Fri Feb 3 13:43:36 2012 @@ -17,8 +17,11 @@ package org.apache.activemq.store.jdbc.adapter; import java.io.IOException; +import java.io.PrintStream; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; @@ -771,6 +774,9 @@ public class DefaultJDBCAdapter implemen rs = s.executeQuery(); if (rs.next()) { result = rs.getLong(1); + if (result == 0 && rs.wasNull()) { + result = -1; + } } } finally { cleanupExclusiveLock.readLock().unlock(); @@ -848,7 +854,30 @@ public class DefaultJDBCAdapter implemen public void setMaxRows(int maxRows) { this.maxRows = maxRows; - } + } + + @Override + public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException { + PreparedStatement s = null; + cleanupExclusiveLock.readLock().lock(); + try { + s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement()); + s.setString(1, destination.getQualifiedName()); + s.setString(2, destination.getQualifiedName()); + s.setString(3, destination.getQualifiedName()); + s.setString(4, null); + s.setLong(5, 0); + s.setString(6, destination.getQualifiedName()); + s.setLong(7, 11); // entry out of priority range + + if (s.executeUpdate() != 1) { + throw new IOException("Could not create ack record for destination: " + destination); + } + } finally { + cleanupExclusiveLock.readLock().unlock(); + close(s); + } + } /** * @param c Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java?rev=1240162&r1=1240161&r2=1240162&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java Fri Feb 3 13:43:36 2012 @@ -21,9 +21,13 @@ import java.util.concurrent.CountDownLat import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import javax.jms.MessageConsumer; +import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.xbean.BrokerFactoryBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,4 +112,21 @@ public class QueueMasterSlaveTest extend slave.set(broker); slaveStarted.countDown(); } + + public void testVirtualTopicFailover() throws Exception { + + MessageConsumer qConsumer = createConsumer(session, new ActiveMQQueue("Consumer.A.VirtualTopic.TA1")); + assertNull("No message there yet", qConsumer.receive(1000)); + qConsumer.close(); + master.stop(); + assertTrue("slave started", slaveStarted.await(10, TimeUnit.SECONDS)); + + final String text = "ForUWhenSlaveKicksIn"; + producer.send(new ActiveMQTopic("VirtualTopic.TA1"), session.createTextMessage(text)); + + qConsumer = createConsumer(session, new ActiveMQQueue("Consumer.A.VirtualTopic.TA1")); + javax.jms.Message message = qConsumer.receive(4000); + assertNotNull("Get message after failover", message); + assertEquals("correct message", text, ((TextMessage)message).getText()); + } } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java?rev=1240162&r1=1240161&r2=1240162&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java Fri Feb 3 13:43:36 2012 @@ -30,6 +30,7 @@ public class JDBCNetworkBrokerDetachTest jdbc.setDataSource(dataSource); jdbc.deleteAllMessages(); broker.setPersistenceAdapter(jdbc); + broker.setUseVirtualTopics(false); } }