activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r822811 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/network/ test/java/org/apache/activemq/bugs/
Date Wed, 07 Oct 2009 17:41:21 GMT
Author: gtully
Date: Wed Oct  7 17:41:21 2009
New Revision: 822811

URL: http://svn.apache.org/viewvc?rev=822811&view=rev
Log:
resolve duplicate message issue from: https://issues.apache.org/activemq/browse/AMQ-2439

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java 
 (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=822811&r1=822810&r2=822811&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Wed Oct  7 17:41:21 2009
@@ -357,20 +357,12 @@
     public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws
Exception {
         Subscription sub = consumerExchange.getSubscription();
         if (sub == null) {
-            sub = subscriptions.get(ack.getConsumerId());
-            
+            sub = subscriptions.get(ack.getConsumerId());        
             if (sub == null) {
-                //networked subscriptions are going to acknowledge in flight messages 
-                //on behalf a subscription that is no more ...
-                if (!consumerExchange.getConnectionContext().isNetworkConnection()
-                     && !consumerExchange.getConnectionContext()
-                                .isInRecoveryMode()) {
-                    throw new IllegalArgumentException(
-                            "The subscription does not exist: "
-                                    + ack.getConsumerId());
-                } else {
-                    return;
-                }
+                LOG.warn("Ack for non existent subscription, ack:" + ack); 
+                throw new IllegalArgumentException(
+                        "The subscription does not exist: "
+                        + ack.getConsumerId());
             }
             consumerExchange.setSubscription(sub);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=822811&r1=822810&r2=822811&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Wed Oct  7 17:41:21 2009
@@ -623,13 +623,24 @@
         }
     }
 
-    protected void removeSubscription(DemandSubscription sub) throws IOException {
+    protected void removeSubscription(final DemandSubscription sub) throws IOException {
         if (sub != null) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug(configuration.getBrokerName() + " remove local subscription for
remote " + sub.getRemoteInfo().getConsumerId());
             }
-            localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
             subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
+            
+            // continue removal in separate thread to free up this thread for outstanding
responses
+            ASYNC_TASKS.execute(new Runnable() {
+                public void run() {
+                    sub.waitForCompletion();
+                    try {
+                        localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
+                    } catch (IOException e) {
+                        LOG.warn("failed to deliver remove command for local subscription,
for remote " + sub.getRemoteInfo().getConsumerId(), e);
+                    }
+                }
+            });
         }
     }
 
@@ -652,9 +663,8 @@
                 if (command.isMessageDispatch()) {
                     enqueueCounter.incrementAndGet();
                     final MessageDispatch md = (MessageDispatch)command;   
-                    DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
+                    final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
                     if (sub != null && md.getMessage()!=null) {
-                    	
                         // See if this consumer's brokerPath tells us it came from the broker
at the other end
                         // of the bridge. I think we should be making this decision based
on the message's
                         // broker bread crumbs and not the consumer's? However, the message's
broker bread
@@ -685,8 +695,8 @@
                                 }
                             }
                             localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE,
1));
-                            dequeueCounter.incrementAndGet();                          
-
+                            dequeueCounter.incrementAndGet();
+                            
                         } else {
 
                             // The message was not sent using async send, so we
@@ -703,16 +713,20 @@
                                         } else {
                                             localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE,
1));
                                             dequeueCounter.incrementAndGet();
+                                           
                                         }
                                     } catch (IOException e) {
                                         serviceLocalException(e);
+                                    } finally {
+                                        sub.decrementOutstandingResponses();
                                     }
                                 }
                             };
 
                             remoteBroker.asyncRequest(message, callback);
+                            sub.incrementOutstandingResponses();
                         }
-
+                        
                     } else {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("No subscription registered with this network bridge
for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=822811&r1=822810&r2=822811&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
Wed Oct  7 17:41:21 2009
@@ -18,10 +18,13 @@
 
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Represents a network bridge interface
@@ -29,10 +32,13 @@
  * @version $Revision: 1.1 $
  */
 public class DemandSubscription {
+    private static final Log LOG = LogFactory.getLog(DemandSubscription.class);
+    
     private final ConsumerInfo remoteInfo;
     private final ConsumerInfo localInfo;
     private Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>();
     private AtomicInteger dispatched = new AtomicInteger(0);
+    private AtomicBoolean activeWaiter = new AtomicBoolean();
 
     DemandSubscription(ConsumerInfo info) {
         remoteInfo = info;
@@ -69,27 +75,6 @@
     }
 
     /**
-     * @return Returns the dispatched.
-     */
-    public int getDispatched() {
-        return dispatched.get();
-    }
-
-    /**
-     * @param dispatched The dispatched to set.
-     */
-    public void setDispatched(int dispatched) {
-        this.dispatched.set(dispatched);
-    }
-
-    /**
-     * @return dispatched count after incremented
-     */
-    public int incrementDispatched() {
-        return dispatched.incrementAndGet();
-    }
-
-    /**
      * @return Returns the localInfo.
      */
     public ConsumerInfo getLocalInfo() {
@@ -102,5 +87,37 @@
      */
     public ConsumerInfo getRemoteInfo() {
         return remoteInfo;
-    }    
+    }
+
+    public void waitForCompletion() {
+        if (dispatched.get() > 0) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Waiting for completion for sub: " + localInfo.getConsumerId()
+ ", dispatched: " + this.dispatched.get());
+            }
+            activeWaiter.set(true);
+            if (dispatched.get() > 0) {
+                synchronized (activeWaiter) {
+                    try {
+                        activeWaiter.wait();
+                    } catch (InterruptedException ignored) {
+                    }
+                }
+                if (this.dispatched.get() > 0) {
+                    LOG.warn("demand sub interrupted or timedout while waiting for outstanding
responses, expect potentially " + this.dispatched.get() + " duplicate deliveried");
+                }
+            }
+        }
+    }
+
+    public void decrementOutstandingResponses() {
+        if (dispatched.decrementAndGet() == 0 && activeWaiter.get()) {
+            synchronized(activeWaiter) {
+                activeWaiter.notifyAll();
+            }
+        }
+    }
+
+    public void incrementOutstandingResponses() {
+        dispatched.incrementAndGet(); 
+    }
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java?rev=822811&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java Wed
Oct  7 17:41:21 2009
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.jmx.BrokerView;
+
+public class AMQ2439Test extends JmsMultipleBrokersTestSupport {
+    Destination dest;
+
+    
+    public void testDuplicatesThroughNetwork() throws Exception {
+        assertEquals("received expected amount", 500, receiveExactMessages("BrokerB", 500));
+        assertEquals("received expected amount", 500, receiveExactMessages("BrokerB", 500));
+        validateQueueStats();
+    }
+    
+    private void validateQueueStats() throws Exception {
+       BrokerView brokerView = brokers.get("BrokerA").broker.getAdminView();
+       assertEquals("enequeue is correct", 1000, brokerView.getTotalEnqueueCount());
+       assertEquals("dequeue is correct", 1000, brokerView.getTotalDequeueCount());
+    }
+
+    protected int receiveExactMessages(String brokerName, int msgCount) throws Exception
{
+        
+        BrokerItem brokerItem = brokers.get(brokerName);
+        Connection connection = brokerItem.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    
  
+        MessageConsumer consumer = session.createConsumer(dest);
+        
+        Message msg;
+        int i;
+        for (i = 0; i < msgCount; i++) {
+            msg = consumer.receive(1000);
+            if (msg == null) {
+                break;
+            }
+        }
+
+        connection.close();
+        brokerItem.connections.remove(connection);
+        
+        return i;
+    }
+    
+    public void setUp() throws Exception {
+        super.setUp();
+        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=true&deleteAllMessagesOnStartup=true&advisorySupport=false"));
+        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=true&deleteAllMessagesOnStartup=true&useJmx=false"));
+        bridgeBrokers("BrokerA", "BrokerB");
+        
+        startAllBrokers();
+        
+        // Create queue
+        dest = createDestination("TEST.FOO", false);
+        sendMessages("BrokerA", dest, 1000);
+    }   
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message