activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1240162 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/jdbc/adapter/ test/java/org/apache/activemq/broker/ft/ test/java/org/apache...
Date Fri, 03 Feb 2012 13:43:37 GMT
Author: gtully
Date: Fri Feb  3 13:43:36 2012
New Revision: 1240162

URL: http://svn.apache.org/viewvc?rev=1240162&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3695: Failover using a JDBC Message Store and Virtual
Topic can result in a lost message if queue is empty. Problem is that an empty destination
is not recorded, as there is no entry in the messages table. Fix is to make use of the ack
table, in the same way a for durable subs. For destinations that match the virtual topic filter,
an entry out of priority range is added to the ack table. the startup destination query now
unions over the ack and messages table

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1240162&r1=1240161&r2=1240162&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Fri Feb  3 13:43:36 2012
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -74,7 +75,9 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.virtual.VirtualTopic;
 import org.apache.activemq.broker.scheduler.SchedulerBroker;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.network.ConnectionFilter;
 import org.apache.activemq.network.DiscoveryNetworkConnector;
 import org.apache.activemq.network.NetworkConnector;
@@ -204,6 +207,7 @@ public class BrokerService implements Se
 
     private int offlineDurableSubscriberTimeout = -1;
     private int offlineDurableSubscriberTaskSchedule = 300000;
+    private DestinationFilter virtualConsumerDestinationFilter;
 
     static {
         String localHostName = "localhost";
@@ -2130,6 +2134,9 @@ public class BrokerService implements Se
                 getBroker().addDestination(adminConnectionContext, destination,true);
             }
         }
+        if (isUseVirtualTopics()) {
+            startVirtualConsumerDestinations();
+        }
     }
 
     /**
@@ -2297,28 +2304,40 @@ public class BrokerService implements Se
          }
     }
 
-    /**
-     * Starts all destiantions in persistence store. This includes all inactive
-     * destinations
-     */
-    protected void startDestinationsInPersistenceStore(Broker broker) throws Exception {
-        Set destinations = destinationFactory.getDestinations();
-        if (destinations != null) {
-            Iterator iter = destinations.iterator();
-            ConnectionContext adminConnectionContext = broker.getAdminConnectionContext();
-            if (adminConnectionContext == null) {
-                ConnectionContext context = new ConnectionContext();
-                context.setBroker(broker);
-                adminConnectionContext = context;
-                broker.setAdminConnectionContext(adminConnectionContext);
-            }
-            while (iter.hasNext()) {
-                ActiveMQDestination destination = (ActiveMQDestination) iter.next();
-                broker.addDestination(adminConnectionContext, destination,false);
+    protected void startVirtualConsumerDestinations() throws Exception {
+        ConnectionContext adminConnectionContext = getAdminConnectionContext();
+        Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
+        DestinationFilter filter = getVirtualTopicConsumerDestinationFilter();
+        if (!destinations.isEmpty()) {
+            for (ActiveMQDestination destination : destinations) {
+                if (filter.matches(destination) == true) {
+                    broker.addDestination(adminConnectionContext, destination, false);
+                }
             }
         }
     }
 
+    private DestinationFilter getVirtualTopicConsumerDestinationFilter() {
+        // created at startup, so no sync needed
+        if (virtualConsumerDestinationFilter == null) {
+            Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>();
+            for (DestinationInterceptor interceptor : destinationInterceptors) {
+                if (interceptor instanceof VirtualDestinationInterceptor) {
+                    VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor)
interceptor;
+                    for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations())
{
+                        if (virtualDestination instanceof VirtualTopic) {
+                            consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix()
+ DestinationFilter.ANY_DESCENDENT));
+                        }
+                    }
+                }
+            }
+            ActiveMQQueue filter = new ActiveMQQueue();
+            filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{}));
+            virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter);
+        }
+        return virtualConsumerDestinationFilter;
+    }
+
     protected synchronized ThreadPoolExecutor getExecutor() {
         if (this.executor == null) {
         this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
@@ -2568,4 +2587,9 @@ public class BrokerService implements Se
     public void setOfflineDurableSubscriberTaskSchedule(int offlineDurableSubscriberTaskSchedule)
{
         this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule;
     }
+
+    public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) {
+        return isUseVirtualTopics() && destination.isQueue() &&
+                getVirtualTopicConsumerDestinationFilter().matches(destination);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=1240162&r1=1240161&r2=1240162&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
Fri Feb  3 13:43:36 2012
@@ -97,4 +97,6 @@ public interface JDBCAdapter {
     public int getMaxRows();
 
     public void setMaxRows(int maxRows);
+
+    void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws
SQLException, IOException;
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=1240162&r1=1240161&r2=1240162&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
Fri Feb  3 13:43:36 2012
@@ -68,14 +68,33 @@ public class JDBCMessageStore extends Ab
 
     protected ActiveMQMessageAudit audit;
     
-    public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter,
WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) {
+    public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter,
WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws
IOException {
         super(destination);
         this.persistenceAdapter = persistenceAdapter;
         this.adapter = adapter;
         this.wireFormat = wireFormat;
         this.audit = audit;
+
+        if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination))
{
+            recordDestinationCreation(destination);
+        }
     }
-    
+
+    private void recordDestinationCreation(ActiveMQDestination destination) throws IOException
{
+        TransactionContext c = persistenceAdapter.getTransactionContext();
+        try {
+            c = persistenceAdapter.getTransactionContext();
+            if (adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, destination.getQualifiedName(),
destination.getQualifiedName()) < 0) {
+                adapter.doRecordDestination(c, destination);
+            }
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+            throw IOExceptionSupport.create("Failed to record destination: " + destination
+ ". Reason: " + e, e);
+        } finally {
+            c.close();
+        }
+    }
+
     public void addMessage(ConnectionContext context, Message message) throws IOException
{
         MessageId messageId = message.getMessageId();
         if (audit != null && audit.isDuplicate(message)) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1240162&r1=1240161&r2=1240162&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
Fri Feb  3 13:43:36 2012
@@ -112,7 +112,6 @@ public class JDBCPersistenceAdapter exte
     }
 
     public Set<ActiveMQDestination> getDestinations() {
-        // Get a connection and insert the message into the DB.
         TransactionContext c = null;
         try {
             c = getTransactionContext();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1240162&r1=1240161&r2=1240162&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
Fri Feb  3 13:43:36 2012
@@ -60,7 +60,7 @@ public class JDBCTopicMessageStore exten
     };
 
 
-    public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter,
WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) {
+    public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter,
WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) throws IOException
{
         super(persistenceAdapter, adapter, wireFormat, topic, audit);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=1240162&r1=1240161&r2=1240162&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
Fri Feb  3 13:43:36 2012
@@ -356,7 +356,8 @@ public class Statements {
 
     public String getFindAllDestinationsStatement() {
         if (findAllDestinationsStatement == null) {
-            findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName();
+            findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName()
+                    + " UNION DISTINCT SELECT DISTINCT CONTAINER FROM "  + getFullAckTableName();
         }
         return findAllDestinationsStatement;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1240162&r1=1240161&r2=1240162&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Fri Feb  3 13:43:36 2012
@@ -17,8 +17,11 @@
 package org.apache.activemq.store.jdbc.adapter;
 
 import java.io.IOException;
+import java.io.PrintStream;
+import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
@@ -771,6 +774,9 @@ public class DefaultJDBCAdapter implemen
             rs = s.executeQuery();
             if (rs.next()) {
                 result = rs.getLong(1);
+                if (result == 0 && rs.wasNull()) {
+                    result = -1;
+                }
             }
         } finally {
             cleanupExclusiveLock.readLock().unlock();
@@ -848,7 +854,30 @@ public class DefaultJDBCAdapter implemen
 
     public void setMaxRows(int maxRows) {
         this.maxRows = maxRows;
-    }    
+    }
+
+    @Override
+    public void doRecordDestination(TransactionContext c, ActiveMQDestination destination)
throws SQLException, IOException {
+        PreparedStatement s = null;
+        cleanupExclusiveLock.readLock().lock();
+        try {
+            s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
+            s.setString(1, destination.getQualifiedName());
+            s.setString(2, destination.getQualifiedName());
+            s.setString(3, destination.getQualifiedName());
+            s.setString(4, null);
+            s.setLong(5, 0);
+            s.setString(6, destination.getQualifiedName());
+            s.setLong(7, 11);  // entry out of priority range
+
+            if (s.executeUpdate() != 1) {
+                throw new IOException("Could not create ack record for destination: " + destination);
+            }
+        } finally {
+            cleanupExclusiveLock.readLock().unlock();
+            close(s);
+        }
+    }
 
     /**
      * @param c

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java?rev=1240162&r1=1240161&r2=1240162&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
Fri Feb  3 13:43:36 2012
@@ -21,9 +21,13 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.xbean.BrokerFactoryBean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -108,4 +112,21 @@ public class QueueMasterSlaveTest extend
         slave.set(broker);
         slaveStarted.countDown();
     }
+
+    public void testVirtualTopicFailover() throws Exception {
+
+        MessageConsumer qConsumer = createConsumer(session, new ActiveMQQueue("Consumer.A.VirtualTopic.TA1"));
+        assertNull("No message there yet", qConsumer.receive(1000));
+        qConsumer.close();
+        master.stop();
+        assertTrue("slave started", slaveStarted.await(10, TimeUnit.SECONDS));
+
+        final String text = "ForUWhenSlaveKicksIn";
+        producer.send(new ActiveMQTopic("VirtualTopic.TA1"), session.createTextMessage(text));
+
+        qConsumer = createConsumer(session, new ActiveMQQueue("Consumer.A.VirtualTopic.TA1"));
+        javax.jms.Message message = qConsumer.receive(4000);
+        assertNotNull("Get message after failover", message);
+        assertEquals("correct message", text, ((TextMessage)message).getText());
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java?rev=1240162&r1=1240161&r2=1240162&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java
Fri Feb  3 13:43:36 2012
@@ -30,6 +30,7 @@ public class JDBCNetworkBrokerDetachTest
         jdbc.setDataSource(dataSource);
         jdbc.deleteAllMessages();
         broker.setPersistenceAdapter(jdbc);
+        broker.setUseVirtualTopics(false);
     }
 	
 }



Mime
View raw message