activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r892242 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/state/ main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/a...
Date Fri, 18 Dec 2009 13:03:41 GMT
Author: gtully
Date: Fri Dec 18 13:03:40 2009
New Revision: 892242

URL: http://svn.apache.org/viewvc?rev=892242&view=rev
Log:
first cut of audit recovery for https://issues.apache.org/activemq/browse/AMQ-2540 - test
case reproduces. Little more tidy up needed such that the query is limited and such that the
audit is recovered at the persistence adapter rather than at the store level - also related
to https://issues.apache.org/activemq/browse/AMQ-2473

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=892242&r1=892241&r2=892242&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Fri Dec 18 13:03:40 2009
@@ -541,6 +541,10 @@
         SessionId sessionId = id.getParentId();
         ConnectionId connectionId = sessionId.getParentId();
         TransportConnectionState cs = lookupConnectionState(connectionId);
+        if (cs == null) {
+            throw new IllegalStateException("Cannot remove a consumer from a connection that
had not been registered: "
+                    + connectionId);
+        }
         SessionState ss = cs.getSessionState(sessionId);
         if (ss == null) {
             throw new IllegalStateException("Cannot remove a consumer from a session that
had not been registered: "
@@ -576,6 +580,9 @@
     public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws
Exception {
         ConnectionId connectionId = id.getParentId();
         TransportConnectionState cs = lookupConnectionState(connectionId);
+        if (cs == null) {
+            throw new IllegalStateException("Cannot remove session from connection that had
not been registered: " + connectionId);
+        }
         SessionState session = cs.getSessionState(id);
         if (session == null) {
             throw new IllegalStateException("Cannot remove session that had not been registered:
" + id);
@@ -643,7 +650,7 @@
             }
         }
         registerConnectionState(info.getConnectionId(), state);
-        LOG.debug("Setting up new connection: " + getRemoteAddress());
+        LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address:
" + getRemoteAddress());
         // Setup the context.
         String clientId = info.getClientId();
         context = new ConnectionContext();
@@ -680,6 +687,7 @@
 
     public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
             throws InterruptedException {
+        LOG.debug("remove connection id: " + id);
         TransportConnectionState cs = lookupConnectionState(id);
         if (cs != null) {
             // Don't allow things to be added to the connection state while we

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=892242&r1=892241&r2=892242&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Fri Dec 18 13:03:40 2009
@@ -85,9 +85,7 @@
             clearIterator(true);
             recovered = true;
         } else {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() +
" cursor got duplicate: " + message);
-            }
+            LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor
got duplicate: " + message);
             storeHasMessages = true;
         }
         return recovered;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=892242&r1=892241&r2=892242&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
Fri Dec 18 13:03:40 2009
@@ -118,6 +118,9 @@
         // Restore the connections.
         for (Iterator<ConnectionState> iter = connectionStates.values().iterator();
iter.hasNext();) {
             ConnectionState connectionState = iter.next();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
+            }
             transport.oneway(connectionState.getInfo());
             restoreTempDestinations(transport, connectionState);
 
@@ -173,6 +176,9 @@
         // Restore the connection's sessions
         for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();)
{
             SessionState sessionState = (SessionState)iter2.next();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("session: " + sessionState.getInfo().getSessionId());
+            }
             transport.oneway(sessionState.getInfo());
 
             if (restoreProducers) {
@@ -207,6 +213,9 @@
         // Restore the session's producers
         for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();)
{
             ProducerState producerState = (ProducerState)iter3.next();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("producer: " + producerState.getInfo().getProducerId());
+            }
             transport.oneway(producerState.getInfo());
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=892242&r1=892241&r2=892242&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
Fri Dec 18 13:03:40 2009
@@ -81,4 +81,6 @@
     void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long
nextSeq, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception;
 
     long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination
destination, String clientId, String subscriberName) throws SQLException, IOException;
+
+    void doMessageIdScan(TransactionContext c, ActiveMQDestination destination, long limit,
JDBCMessageIdScanListener listener) throws SQLException, IOException;
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java?rev=892242&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java
Fri Dec 18 13:03:40 2009
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import org.apache.activemq.command.MessageId;
+
+public interface JDBCMessageIdScanListener {
+    boolean messageId(MessageId id);
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=892242&r1=892241&r2=892242&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
Fri Dec 18 13:03:40 2009
@@ -18,8 +18,6 @@
 
 import java.io.IOException;
 import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.ActiveMQMessageAudit;
@@ -28,10 +26,8 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.ByteSequenceData;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -57,8 +53,28 @@
         this.adapter = adapter;
         this.wireFormat = wireFormat;
         this.audit = audit;
+        initAudit();
     }
 
+    /*
+     * revisit: This can be destination agnostic and back in the jdbc persistence adapter
start
+     */
+    public void initAudit() {
+        if (audit != null) {
+            try {
+                TransactionContext c = persistenceAdapter.getTransactionContext(null);
+                adapter.doMessageIdScan(c, destination, 100, new JDBCMessageIdScanListener()
{
+                    public boolean messageId(MessageId id) {
+                        audit.isDuplicate(id);
+                        return true;
+                    }
+                });
+            } catch (Exception e) {
+                LOG.error("Failed to reload store message audit for queue store " + destination);
+            }
+        }
+    }
+    
     public void addMessage(ConnectionContext context, Message message) throws IOException
{
 
         MessageId messageId = message.getMessageId();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=892242&r1=892241&r2=892242&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
Fri Dec 18 13:03:40 2009
@@ -145,6 +145,16 @@
         }
         return findAllMessagesStatement;
     }
+    
+    public String getFindAllMessageIds() {
+        //  this needs to be limited maybe need to use getFindLastSequenceIdInMsgsStatement
+        // and work back for X
+        if (findAllMessagesStatement == null) {
+            findAllMessagesStatement = "SELECT ID, MSGID_PROD, MSGID_SEQ FROM " + getFullMessageTableName()
+                                       + " WHERE CONTAINER=? ORDER BY ID DESC";
+        }
+        return findAllMessagesStatement;
+    }
 
     public String getFindLastSequenceIdInMsgsStatement() {
         if (findLastSequenceIdInMsgsStatement == null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=892242&r1=892241&r2=892242&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Fri Dec 18 13:03:40 2009
@@ -33,6 +33,7 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.store.jdbc.JDBCAdapter;
+import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
 import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.store.jdbc.Statements;
@@ -326,6 +327,27 @@
         }
     }
 
+    public void doMessageIdScan(TransactionContext c, ActiveMQDestination destination, long
limit, 
+            JDBCMessageIdScanListener listener) throws SQLException, IOException {
+        PreparedStatement s = null;
+        ResultSet rs = null;
+        try {
+            s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIds());
+            s.setString(1, destination.getQualifiedName());
+            // limit the query. just need the the last few messages that could be replayed

+            // on recovery. send or commit reply lost so it gets replayed.
+            rs = s.executeQuery();
+            while (rs.next()) {
+                if (!listener.messageId(new MessageId(rs.getString(2), rs.getLong(3)))) {

+                    break;
+                }
+            }
+        } finally {
+            close(rs);
+            close(s);
+        }
+    }
+    
     public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String
clientId,
             String subscriptionName, long seq) throws SQLException, IOException {
         PreparedStatement s = c.getUpdateLastAckStatement();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=892242&r1=892241&r2=892242&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Fri Dec 18 13:03:40 2009
@@ -18,10 +18,14 @@
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
@@ -34,10 +38,10 @@
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 // see https://issues.apache.org/activemq/browse/AMQ-2473
@@ -48,7 +52,6 @@
 	private String url = "tcp://localhost:61616";
 	BrokerService broker;
 	
-	@Before
 	public void startCleanBroker() throws Exception {
 	    startBroker(true);
 	}
@@ -69,13 +72,13 @@
 	    broker = new BrokerService();
 	    broker.setUseJmx(false);
 	    broker.addConnector(url);
-	    broker.setDeleteAllMessagesOnStartup(true);
+	    broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
 	    return broker;
 	}
 
-	@Test
+	//@Test
 	public void testFailoverProducerCloseBeforeTransaction() throws Exception {
-		
+	    startCleanBroker();
 		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
 		Connection connection = cf.createConnection();
 		connection.start();
@@ -103,10 +106,20 @@
 	
     @Test
     public void testFailoverCommitReplyLost() throws Exception {
-        
-        broker.stop();
+        doTestFailoverCommitReplyLost(false);
+    }  
+    
+    @Test
+    public void testFailoverCommitReplyLostJdbc() throws Exception {
+        doTestFailoverCommitReplyLost(true);
+    }
+    
+    public void doTestFailoverCommitReplyLost(boolean useJdbcPersistenceAdapter) throws Exception
{
         
         broker = createBroker(true);
+        if (useJdbcPersistenceAdapter) {
+            broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
+        }
         broker.setPlugins(new BrokerPlugin[] {
                 new BrokerPluginSupport() {
                     @Override
@@ -141,13 +154,15 @@
         
         TextMessage message = session.createTextMessage("Test message");
         producer.send(message);
-
+        
+        final CountDownLatch commitDoneLatch = new CountDownLatch(1);
         // broker will die on commit reply so this will hang till restart
         Executors.newSingleThreadExecutor().execute(new Runnable() {   
             public void run() {
                 LOG.info("doing async commit...");
                 try {
                     session.commit();
+                    commitDoneLatch.countDown();
                     LOG.info("done async commit");
                 } catch (Exception e) {
                     e.printStackTrace();
@@ -155,18 +170,54 @@
             }
         });
        
+        // will be stopped by the plugin
         broker.waitUntilStopped();
-        startBroker(false);
+        broker = createBroker(false);
+        if (useJdbcPersistenceAdapter) {
+            broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
+        }
+        broker.start();
 
-        assertNotNull("we got the message", consumer.receive(20000));
+        assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+        
+        // new transaction
+        Message msg = consumer.receive(20000);
+        LOG.info("Received: " + msg);
+        assertNotNull("we got the message", msg);
         assertNull("we got just one message", consumer.receive(2000));
-        session.commit();   
+        session.commit();
+        consumer.close();
+        connection.close();
+        
+        // ensure no dangling messages with fresh broker etc
+        broker.stop();
+        broker.waitUntilStopped();
+        
+        LOG.info("Checking for remaining/hung messages..");
+        broker = createBroker(false);
+        if (useJdbcPersistenceAdapter) {
+            broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
+        }
+        broker.start();
+        
+        // after restart, ensure no dangling messages
+        cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        connection = cf.createConnection();
+        connection.start();
+        Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session2.createConsumer(destination);
+        msg = consumer.receive(1000);
+        if (msg == null) {
+            msg = consumer.receive(5000);
+        }
+        LOG.info("Received: " + msg);
+        assertNull("no messges left dangling but got: " + msg, msg);
         connection.close();
     }
 
 	@Test
 	public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception
{
-	        
+	    startCleanBroker();        
 	    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false");
 	    Connection connection = cf.createConnection();
 	    connection.start();
@@ -188,15 +239,15 @@
 	    
 	    session.commit();
 	    
-	    // withough tracking producers, message will not be replayed on recovery
-	    assertNull("we got the message", consumer.receive(2000));
+	    // without tracking producers, message will not be replayed on recovery
+	    assertNull("we got the message", consumer.receive(5000));
 	    session.commit();   
 	    connection.close();
 	}
 	
 	@Test
 	public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
-	        
+	    startCleanBroker();	        
 	    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
 	    Connection connection = cf.createConnection();
 	    connection.start();



Mime
View raw message