activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r892781 - in /activemq/branches/activemq-5.3/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/state/ main/java/org/apache/activemq/store/jdbc/ main/j...
Date Mon, 21 Dec 2009 11:50:57 GMT
Author: dejanb
Date: Mon Dec 21 11:50:56 2009
New Revision: 892781

URL: http://svn.apache.org/viewvc?rev=892781&view=rev
Log:
merging 891582,891622,892194,892242,892291,892729,892759: https://issues.apache.org/activemq/browse/AMQ-2473,
https://issues.apache.org/activemq/browse/AMQ-2540

Added:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java
      - copied, changed from r892242, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
      - copied, changed from r891582, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=892781&r1=892780&r2=892781&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Mon Dec 21 11:50:56 2009
@@ -541,6 +541,10 @@
         SessionId sessionId = id.getParentId();
         ConnectionId connectionId = sessionId.getParentId();
         TransportConnectionState cs = lookupConnectionState(connectionId);
+        if (cs == null) {
+            throw new IllegalStateException("Cannot remove a consumer from a connection that
had not been registered: "
+                    + connectionId);
+        }
         SessionState ss = cs.getSessionState(sessionId);
         if (ss == null) {
             throw new IllegalStateException("Cannot remove a consumer from a session that
had not been registered: "
@@ -576,6 +580,9 @@
     public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws
Exception {
         ConnectionId connectionId = id.getParentId();
         TransportConnectionState cs = lookupConnectionState(connectionId);
+        if (cs == null) {
+            throw new IllegalStateException("Cannot remove session from connection that had
not been registered: " + connectionId);
+        }
         SessionState session = cs.getSessionState(id);
         if (session == null) {
             throw new IllegalStateException("Cannot remove session that had not been registered:
" + id);
@@ -643,7 +650,7 @@
             }
         }
         registerConnectionState(info.getConnectionId(), state);
-        LOG.debug("Setting up new connection: " + getRemoteAddress());
+        LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address:
" + getRemoteAddress());
         // Setup the context.
         String clientId = info.getClientId();
         context = new ConnectionContext();
@@ -680,6 +687,7 @@
 
     public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
             throws InterruptedException {
+        LOG.debug("remove connection id: " + id);
         TransportConnectionState cs = lookupConnectionState(id);
         if (cs != null) {
             // Don't allow things to be added to the connection state while we

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=892781&r1=892780&r2=892781&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Mon Dec 21 11:50:56 2009
@@ -85,9 +85,7 @@
             clearIterator(true);
             recovered = true;
         } else {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() +
" cursor got duplicate: " + message);
-            }
+            LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor
got duplicate: " + message);
             storeHasMessages = true;
         }
         return recovered;

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=892781&r1=892780&r2=892781&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
Mon Dec 21 11:50:56 2009
@@ -29,7 +29,6 @@
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
@@ -61,6 +60,7 @@
     private boolean restoreProducers = true;
     private boolean restoreTransaction = true;
     private boolean trackMessages = true;
+    private boolean trackTransactionProducers = true;
     private int maxCacheSize = 128 * 1024;
     private int currentCacheSize;
     private Map<MessageId,Message> messageCache = new LinkedHashMap<MessageId,Message>(){
@@ -118,6 +118,9 @@
         // Restore the connections.
         for (Iterator<ConnectionState> iter = connectionStates.values().iterator();
iter.hasNext();) {
             ConnectionState connectionState = iter.next();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
+            }
             transport.oneway(connectionState.getInfo());
             restoreTempDestinations(transport, connectionState);
 
@@ -136,18 +139,31 @@
     }
 
     private void restoreTransactions(Transport transport, ConnectionState connectionState)
throws IOException {
-        for (Iterator iter = connectionState.getTransactionStates().iterator(); iter.hasNext();)
{
-            TransactionState transactionState = (TransactionState)iter.next();
+        for (TransactionState transactionState : connectionState.getTransactionStates())
{
             if (LOG.isDebugEnabled()) {
                 LOG.debug("tx: " + transactionState.getId());
             }
-            for (Iterator iterator = transactionState.getCommands().iterator(); iterator.hasNext();)
{
-                Command command = (Command)iterator.next();
+            
+            for (ProducerState producerState : transactionState.getProducerStates().values())
{
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("tx replay producer :" + producerState.getInfo());
+                }
+                transport.oneway(producerState.getInfo());
+            }
+            
+            for (Command command : transactionState.getCommands()) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("tx replay: " + command);
                 }
                 transport.oneway(command);
             }
+            
+            for (ProducerState producerState : transactionState.getProducerStates().values())
{
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("tx remove replayed producer :" + producerState.getInfo());
+                }
+                transport.oneway(producerState.getInfo().createRemoveCommand());
+            }
         }
     }
 
@@ -160,6 +176,9 @@
         // Restore the connection's sessions
         for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();)
{
             SessionState sessionState = (SessionState)iter2.next();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("session: " + sessionState.getInfo().getSessionId());
+            }
             transport.oneway(sessionState.getInfo());
 
             if (restoreProducers) {
@@ -194,6 +213,9 @@
         // Restore the session's producers
         for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();)
{
             ProducerState producerState = (ProducerState)iter3.next();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("producer: " + producerState.getInfo().getProducerId());
+            }
             transport.oneway(producerState.getInfo());
         }
     }
@@ -350,13 +372,22 @@
     public Response processMessage(Message send) throws Exception {
         if (send != null) {
             if (trackTransactions && send.getTransactionId() != null) {
-                ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
+                ProducerId producerId = send.getProducerId();
+                ConnectionId connectionId = producerId.getParentId().getParentId();
                 if (connectionId != null) {
                     ConnectionState cs = connectionStates.get(connectionId);
                     if (cs != null) {
                         TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
                         if (transactionState != null) {
                             transactionState.addCommand(send);
+                            
+                            if (trackTransactionProducers) {
+                                // for jmstemplate, track the producer in case it is closed
before commit
+                                // and needs to be replayed
+                                SessionState ss = cs.getSessionState(producerId.getParentId());
+                                ProducerState producerState = ss.getProducerState(producerId);
+                                producerState.setTransactionState(transactionState);    
       
+                            }
                         }
                     }
                 }
@@ -500,7 +531,15 @@
     public void setTrackTransactions(boolean trackTransactions) {
         this.trackTransactions = trackTransactions;
     }
+    
+    public boolean isTrackTransactionProducers() {
+        return this.trackTransactionProducers;
+    }
 
+    public void setTrackTransactionProducers(boolean trackTransactionProducers) {
+        this.trackTransactionProducers = trackTransactionProducers;
+    }
+    
     public boolean isRestoreTransaction() {
         return restoreTransaction;
     }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java?rev=892781&r1=892780&r2=892781&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
Mon Dec 21 11:50:56 2009
@@ -21,6 +21,7 @@
 
 public class ProducerState {
     final ProducerInfo info;
+    private TransactionState transactionState;
 
     public ProducerState(ProducerInfo info) {
         this.info = info;
@@ -34,4 +35,11 @@
         return info;
     }
 
+    public void setTransactionState(TransactionState transactionState) {
+        this.transactionState = transactionState;
+    }
+
+    public TransactionState getTransactionState() {
+        return transactionState;
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java?rev=892781&r1=892780&r2=892781&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
Mon Dec 21 11:50:56 2009
@@ -50,9 +50,16 @@
     }
 
     public ProducerState removeProducer(ProducerId id) {
-        return producers.remove(id);
+        ProducerState producerState = producers.remove(id);
+        if (producerState != null) {
+            if (producerState.getTransactionState() != null) {
+                // allow the transaction to recreate dependent producer on recovery
+                producerState.getTransactionState().addProducerState(producerState);
+            }
+        }
+        return producerState;
     }
-
+    
     public void addConsumer(ConsumerInfo info) {
         checkShutdown();
         consumers.put(info.getConsumerId(), new ConsumerState(info));

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java?rev=892781&r1=892780&r2=892781&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
Mon Dec 21 11:50:56 2009
@@ -18,9 +18,12 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.TransactionId;
 
 public class TransactionState {
@@ -30,6 +33,7 @@
     private final AtomicBoolean shutdown = new AtomicBoolean(false);
     private boolean prepared;
     private int preparedResult;
+    private final Map<ProducerId, ProducerState> producers = new ConcurrentHashMap<ProducerId,
ProducerState>();
 
     public TransactionState(TransactionId id) {
         this.id = id;
@@ -78,4 +82,14 @@
         return preparedResult;
     }
 
+    public void addProducerState(ProducerState producerState) {
+        if (producerState != null) {
+            producers.put(producerState.getInfo().getProducerId(), producerState);
+        }
+    }
+
+    public Map<ProducerId, ProducerState> getProducerStates() {
+        return producers;
+    }
+
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=892781&r1=892780&r2=892781&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
Mon Dec 21 11:50:56 2009
@@ -81,4 +81,6 @@
     void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long
nextSeq, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception;
 
     long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination
destination, String clientId, String subscriberName) throws SQLException, IOException;
+
+    void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener)
throws SQLException, IOException;
 }

Copied: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java
(from r892242, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java)
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java?p2=activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java&r1=892242&r2=892781&rev=892781&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java
Mon Dec 21 11:50:56 2009
@@ -19,5 +19,5 @@
 import org.apache.activemq.command.MessageId;
 
 public interface JDBCMessageIdScanListener {
-    boolean messageId(MessageId id);
+    void messageId(MessageId id);
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=892781&r1=892780&r2=892781&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
Mon Dec 21 11:50:56 2009
@@ -18,8 +18,6 @@
 
 import java.io.IOException;
 import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.ActiveMQMessageAudit;
@@ -28,10 +26,8 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.ByteSequenceData;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -58,7 +54,7 @@
         this.wireFormat = wireFormat;
         this.audit = audit;
     }
-
+    
     public void addMessage(ConnectionContext context, Message message) throws IOException
{
 
         MessageId messageId = message.getMessageId();

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=892781&r1=892780&r2=892781&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
Mon Dec 21 11:50:56 2009
@@ -35,6 +35,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -90,6 +91,7 @@
     protected int maxProducersToAudit=1024;
     protected int maxAuditDepth=1000;
     protected boolean enableAudit=true;
+    protected int auditRecoveryDepth = 1024;
     protected ActiveMQMessageAudit audit;
 
     public JDBCPersistenceAdapter() {
@@ -126,15 +128,33 @@
         return Collections.EMPTY_SET;
     }
     
-    protected ActiveMQMessageAudit createMessageAudit() {
-    	if (enableAudit && audit == null) {
-    		audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
+    protected void createMessageAudit() {
+        if (enableAudit && audit == null) {
+            audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
+            TransactionContext c = null;
+            
+            try {
+                c = getTransactionContext();
+                getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener()
{
+                    public void messageId(MessageId id) {
+                        audit.isDuplicate(id);
+                    }
+                });
+            } catch (Exception e) {
+                LOG.error("Failed to reload store message audit for JDBC persistence adapter",
e);
+            } finally {
+                if (c != null) {
+                    try {
+                        c.close();
+                    } catch (Throwable e) {
+                    }
+                }
+            }
     	}
-    	return audit;
     }
 
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException
{
-        MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination,
createMessageAudit());
+        MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination,
audit);
         if (transactionStore != null) {
             rc = transactionStore.proxy(rc);
         }
@@ -142,7 +162,7 @@
     }
 
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException
{
-        TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat,
destination, createMessageAudit());
+        TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat,
destination, audit);
         if (transactionStore != null) {
             rc = transactionStore.proxy(rc);
         }
@@ -234,6 +254,8 @@
                 }
             }, cleanupPeriod, cleanupPeriod, TimeUnit.MILLISECONDS);
         }
+        
+        createMessageAudit();
     }
 
     public synchronized void stop() throws Exception {
@@ -625,6 +647,13 @@
 	public void setEnableAudit(boolean enableAudit) {
 		this.enableAudit = enableAudit;
 	}
-    
+
+    public int getAuditRecoveryDepth() {
+        return auditRecoveryDepth;
+    }
+
+    public void setAuditRecoveryDepth(int auditRecoveryDepth) {
+        this.auditRecoveryDepth = auditRecoveryDepth;
+    }
     
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=892781&r1=892780&r2=892781&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
Mon Dec 21 11:50:56 2009
@@ -65,6 +65,7 @@
     private String destinationMessageCountStatement;
     private String findNextMessagesStatement;
     private boolean useLockCreateWhereClause;
+    private String findAllMessageIdsStatement;
 
     public String[] getCreateSchemaStatements() {
         if (createSchemaStatements == null) {
@@ -145,6 +146,16 @@
         }
         return findAllMessagesStatement;
     }
+    
+    public String getFindAllMessageIdsStatement() {
+        //  this needs to be limited maybe need to use getFindLastSequenceIdInMsgsStatement
+        // and work back for X
+        if (findAllMessageIdsStatement == null) {
+            findAllMessageIdsStatement = "SELECT ID, MSGID_PROD, MSGID_SEQ FROM " + getFullMessageTableName()
+                                       + " ORDER BY ID DESC";
+        }
+        return findAllMessageIdsStatement;
+    }
 
     public String getFindLastSequenceIdInMsgsStatement() {
         if (findLastSequenceIdInMsgsStatement == null) {

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=892781&r1=892780&r2=892781&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Mon Dec 21 11:50:56 2009
@@ -31,6 +31,7 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.store.jdbc.JDBCAdapter;
+import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
 import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.store.jdbc.Statements;
@@ -324,6 +325,23 @@
         }
     }
 
+    public void doMessageIdScan(TransactionContext c, int limit, 
+            JDBCMessageIdScanListener listener) throws SQLException, IOException {
+        PreparedStatement s = null;
+        ResultSet rs = null;
+        try {
+            s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
+            s.setMaxRows(limit);
+            rs = s.executeQuery();
+            while (rs.next()) {
+                listener.messageId(new MessageId(rs.getString(2), rs.getLong(3)));
+            }
+        } finally {
+            close(rs);
+            close(s);
+        }
+    }
+    
     public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String
clientId,
             String subscriptionName, long seq) throws SQLException, IOException {
         PreparedStatement s = c.getUpdateLastAckStatement();

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=892781&r1=892780&r2=892781&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Mon Dec 21 11:50:56 2009
@@ -95,6 +95,7 @@
     private List<BackupTransport> backups=new CopyOnWriteArrayList<BackupTransport>();
     private int backupPoolSize=1;
     private boolean trackMessages = false;
+    private boolean trackTransactionProducers = true;
     private int maxCacheSize = 128 * 1024;
     private TransportListener disposedListener = new DefaultTransportListener() {};
     
@@ -233,6 +234,7 @@
             started = true;
             stateTracker.setMaxCacheSize(getMaxCacheSize());
             stateTracker.setTrackMessages(isTrackMessages());
+            stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
             if (connectedTransport.get() != null) {
                 stateTracker.restore(connectedTransport.get());
             } else {
@@ -372,6 +374,14 @@
         this.trackMessages = trackMessages;
     }
 
+    public boolean isTrackTransactionProducers() {
+        return this.trackTransactionProducers;
+    }
+
+    public void setTrackTransactionProducers(boolean trackTransactionProducers) {
+        this.trackTransactionProducers = trackTransactionProducers;
+    }
+
     public int getMaxCacheSize() {
         return maxCacheSize;
     }
@@ -495,7 +505,7 @@
 
                     } catch (IOException e) {
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug("Send oneway attempt: " + i + " failed for command:"
+ command);   
+                            LOG.debug("Send oneway attempt: " + i + " failed for command:"
+ command);
                         }
                         handleTransportFailure(e);
                     }

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java?rev=892781&r1=892780&r2=892781&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
Mon Dec 21 11:50:56 2009
@@ -28,7 +28,6 @@
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
 
 public class FailoverTimeoutTest extends TestCase {
 	
@@ -43,7 +42,7 @@
 		bs.addConnector(tcpUri);
 		bs.start();
 		
-		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?timeout="
+ timeout);
+		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?timeout="
+ timeout + "&useExponentialBackOff=false");
 		Connection connection = cf.createConnection();
 		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 		MessageProducer producer = session.createProducer(session
@@ -59,11 +58,11 @@
 			assertEquals("Failover timeout of " + timeout + " ms reached.", jmse.getMessage());
 		}
 		
-		bs = new BrokerService();
-		
+		bs = new BrokerService();		
 		bs.setUseJmx(false);
 		bs.addConnector(tcpUri);
 		bs.start();
+		bs.waitUntilStarted();
 		
 		producer.send(message);
 		

Copied: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
(from r891582, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java)
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?p2=activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java&r1=891582&r2=892781&rev=892781&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Mon Dec 21 11:50:56 2009
@@ -18,9 +18,16 @@
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
@@ -28,19 +35,26 @@
 import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 // see https://issues.apache.org/activemq/browse/AMQ-2473
 public class FailoverTransactionTest {
 	
+    private static final Log LOG = LogFactory.getLog(FailoverTransactionTest.class);
 	private static final String QUEUE_NAME = "test.FailoverTransactionTest";
 	private String url = "tcp://localhost:61616";
 	BrokerService broker;
 	
-	@Before
 	public void startCleanBroker() throws Exception {
 	    startBroker(true);
 	}
@@ -53,16 +67,21 @@
 	}
 	
 	public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
-	    broker = new BrokerService();
-        broker.setUseJmx(false);
-        broker.addConnector(url);
-        broker.setDeleteAllMessagesOnStartup(true);
+	    broker = createBroker(deleteAllMessagesOnStartup);
         broker.start();
 	}
-	
-	@Test
+
+	public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
  
+	    broker = new BrokerService();
+	    broker.setUseJmx(false);
+	    broker.addConnector(url);
+	    broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
+	    return broker;
+	}
+
+	//@Test
 	public void testFailoverProducerCloseBeforeTransaction() throws Exception {
-		
+	    startCleanBroker();
 		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
 		Connection connection = cf.createConnection();
 		connection.start();
@@ -88,9 +107,135 @@
 		connection.close();
 	}
 	
+    @Test
+    public void testFailoverCommitReplyLost() throws Exception {
+        doTestFailoverCommitReplyLost(0);
+    }  
+    
+    @Test
+    public void testFailoverCommitReplyLostJdbc() throws Exception {
+        doTestFailoverCommitReplyLost(1);
+    }
+    
+    @Test
+    public void testFailoverCommitReplyLostKahaDB() throws Exception {
+        doTestFailoverCommitReplyLost(2);
+    }
+    
+    public void doTestFailoverCommitReplyLost(final int adapter) throws Exception {
+        
+        broker = createBroker(true);
+        setPersistenceAdapter(adapter);
+            
+        broker.setPlugins(new BrokerPlugin[] {
+                new BrokerPluginSupport() {
+                    @Override
+                    public void commitTransaction(ConnectionContext context,
+                            TransactionId xid, boolean onePhase) throws Exception {
+                        super.commitTransaction(context, xid, onePhase);
+                        // so commit will hang as if reply is lost
+                        context.setDontSendReponse(true);
+                        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+                            public void run() {
+                                LOG.info("Stopping broker post commit...");
+                                try {
+                                    broker.stop();
+                                } catch (Exception e) {
+                                    e.printStackTrace();
+                                }
+                            }
+                        });
+                   }   
+                }
+        });
+        broker.start();
+        
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url +
")");
+        Connection connection = cf.createConnection();
+        connection.start();
+        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(QUEUE_NAME);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+        
+        TextMessage message = session.createTextMessage("Test message");
+        producer.send(message);
+        
+        final CountDownLatch commitDoneLatch = new CountDownLatch(1);
+        // broker will die on commit reply so this will hang till restart
+        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+            public void run() {
+                LOG.info("doing async commit...");
+                try {
+                    session.commit();
+                    commitDoneLatch.countDown();
+                    LOG.info("done async commit");
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+       
+        // will be stopped by the plugin
+        broker.waitUntilStopped();
+        broker = createBroker(false);
+        setPersistenceAdapter(adapter);
+        broker.start();
+
+        assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+        
+        // new transaction
+        Message msg = consumer.receive(20000);
+        LOG.info("Received: " + msg);
+        assertNotNull("we got the message", msg);
+        assertNull("we got just one message", consumer.receive(2000));
+        session.commit();
+        consumer.close();
+        connection.close();
+        
+        // ensure no dangling messages with fresh broker etc
+        broker.stop();
+        broker.waitUntilStopped();
+        
+        LOG.info("Checking for remaining/hung messages..");
+        broker = createBroker(false);
+        setPersistenceAdapter(adapter);
+        broker.start();
+        
+        // after restart, ensure no dangling messages
+        cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        connection = cf.createConnection();
+        connection.start();
+        Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session2.createConsumer(destination);
+        msg = consumer.receive(1000);
+        if (msg == null) {
+            msg = consumer.receive(5000);
+        }
+        LOG.info("Received: " + msg);
+        assertNull("no messges left dangling but got: " + msg, msg);
+        connection.close();
+    }
+
+    private void setPersistenceAdapter(int adapter) throws IOException {
+        switch (adapter) {
+        case 0:
+            break;
+        case 1:
+            broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
+            break;
+        case 2:
+            KahaDBPersistenceAdapter store = new KahaDBPersistenceAdapter();
+            store.setDirectory(new File("target/activemq-data/kahadb/FailoverTransactionTest"));
+            broker.setPersistenceAdapter(store);
+            break;
+        }
+    }
+
 	@Test
 	public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception
{
-	        
+	    startCleanBroker();        
 	    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false");
 	    Connection connection = cf.createConnection();
 	    connection.start();
@@ -112,15 +257,15 @@
 	    
 	    session.commit();
 	    
-	    // withough tracking producers, message will not be replayed on recovery
-	    assertNull("we got the message", consumer.receive(2000));
+	    // without tracking producers, message will not be replayed on recovery
+	    assertNull("we got the message", consumer.receive(5000));
 	    session.commit();   
 	    connection.close();
 	}
 	
 	@Test
 	public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
-	        
+	    startCleanBroker();	        
 	    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
 	    Connection connection = cf.createConnection();
 	    connection.start();



Mime
View raw message