activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1197072 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker: ProducerBrokerExchange.java TransportConnection.java
Date Thu, 03 Nov 2011 11:53:31 GMT
Author: gtully
Date: Thu Nov  3 11:53:31 2011
New Revision: 1197072

URL: http://svn.apache.org/viewvc?rev=1197072&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3576 - fix regression in network connector tests,
the sequence id check is not valud for bridges that concentrate messages from multiple consumers.
duplicate suppression for network connectors needs to be based on message ids

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java?rev=1197072&r1=1197071&r2=1197072&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
Thu Nov  3 11:53:31 2011
@@ -39,6 +39,7 @@ public class ProducerBrokerExchange {
     private ProducerState producerState;
     private boolean mutable = true;
     private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
+    private boolean auditProducerSequenceIds;
     
     public ProducerBrokerExchange() {
     }
@@ -131,20 +132,23 @@ public class ProducerBrokerExchange {
      */
     public boolean canDispatch(Message messageSend) {
         boolean canDispatch = true;
-        if (lastSendSequenceNumber.get() > 0) {
+        if (auditProducerSequenceIds) {
             if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber.get())
{
                 canDispatch = false;
                 LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId()
+ "] with producerSequenceId ["
                         + messageSend.getMessageId().getProducerSequenceId() + "] less than
last stored: "  + lastSendSequenceNumber);
             }
-        }
-        if (canDispatch) {
-            lastSendSequenceNumber.set(messageSend.getMessageId().getProducerSequenceId());
+
+            if (canDispatch) {
+                // track current so we can suppress duplicates later in the stream
+                lastSendSequenceNumber.set(messageSend.getMessageId().getProducerSequenceId());
+            }
         }
         return canDispatch;
     }
 
     public void setLastStoredSequenceId(long l) {
+        auditProducerSequenceIds = true;
         lastSendSequenceNumber.set(l);
         LOG.debug("last stored sequence id set: " + l);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1197072&r1=1197071&r2=1197072&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Thu Nov  3 11:53:31 2011
@@ -1313,7 +1313,7 @@ public class TransportConnection impleme
                 result = new ProducerBrokerExchange();
                 TransportConnectionState state = lookupConnectionState(id);
                 context = state.getContext();
-                if (context.isReconnect()) {
+                if (context.isReconnect() && !context.isNetworkConnection()) {
                     result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
                 }
                 result.setConnectionContext(context);



Mime
View raw message