activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1438123 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java
Date Thu, 24 Jan 2013 18:39:09 GMT
Author: tabish
Date: Thu Jan 24 18:39:09 2013
New Revision: 1438123

URL: http://svn.apache.org/viewvc?rev=1438123&view=rev
Log:
apply fix and updated test for: https://issues.apache.org/jira/browse/AMQ-4148

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java 
 (with props)
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1438123&r1=1438122&r2=1438123&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Thu Jan 24 18:39:09 2013
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -140,7 +139,7 @@ public abstract class DemandForwardingBr
     private TransportConnection duplexInitiatingConnection;
     private BrokerService brokerService = null;
     private ObjectName mbeanObjectName;
-    private ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
+    private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
 
     public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport
localBroker, Transport remoteBroker) {
         this.configuration = configuration;
@@ -156,6 +155,7 @@ public abstract class DemandForwardingBr
         serviceRemoteCommand(remoteBrokerInfo);
     }
 
+    @Override
     public void start() throws Exception {
         if (started.compareAndSet(false, true)) {
 
@@ -178,11 +178,13 @@ public abstract class DemandForwardingBr
             });
             remoteBroker.setTransportListener(new DefaultTransportListener() {
 
+                @Override
                 public void onCommand(Object o) {
                     Command command = (Command) o;
                     serviceRemoteCommand(command);
                 }
 
+                @Override
                 public void onException(IOException error) {
                     serviceRemoteException(error);
                 }
@@ -206,6 +208,7 @@ public abstract class DemandForwardingBr
 
     protected void triggerLocalStartBridge() throws IOException {
         brokerService.getTaskRunnerFactory().execute(new Runnable() {
+            @Override
             public void run() {
                 final String originalName = Thread.currentThread().getName();
                 Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
@@ -222,6 +225,7 @@ public abstract class DemandForwardingBr
 
     protected void triggerRemoteStartBridge() throws IOException {
         brokerService.getTaskRunnerFactory().execute(new Runnable() {
+            @Override
             public void run() {
                 final String originalName = Thread.currentThread().getName();
                 Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker);
@@ -344,6 +348,7 @@ public abstract class DemandForwardingBr
         }
     }
 
+    @Override
     public void stop() throws Exception {
         if (started.compareAndSet(true, false)) {
             if (disposed.compareAndSet(false, true)) {
@@ -357,6 +362,7 @@ public abstract class DemandForwardingBr
                     final CountDownLatch sendShutdown = new CountDownLatch(1);
 
                     brokerService.getTaskRunnerFactory().execute(new Runnable() {
+                        @Override
                         public void run() {
                             try {
                                 serialExecutor.shutdown();
@@ -400,6 +406,7 @@ public abstract class DemandForwardingBr
         }
     }
 
+    @Override
     public void serviceRemoteException(Throwable error) {
         if (!disposed.get()) {
             if (error instanceof SecurityException || error instanceof GeneralSecurityException)
{
@@ -409,6 +416,7 @@ public abstract class DemandForwardingBr
             }
             LOG.debug("The remote Exception was: " + error, error);
             brokerService.getTaskRunnerFactory().execute(new Runnable() {
+                @Override
                 public void run() {
                     ServiceSupport.dispose(getControllingService());
                 }
@@ -631,6 +639,7 @@ public abstract class DemandForwardingBr
             if (destInfo.isRemoveOperation()) {
                 // serialise with removeSub operations such that all removeSub advisories
are generated
                 serialExecutor.execute(new Runnable() {
+                    @Override
                     public void run() {
                         try {
                             localBroker.oneway(destInfo);
@@ -648,11 +657,13 @@ public abstract class DemandForwardingBr
         }
     }
 
+    @Override
     public void serviceLocalException(Throwable error) {
         if (!disposed.get()) {
             LOG.info("Network connection between " + localBroker + " and " + remoteBroker
+ " shutdown due to a local error: " + error);
             LOG.debug("The local Exception was:" + error, error);
             brokerService.getTaskRunnerFactory().execute(new Runnable() {
+                @Override
                 public void run() {
                     ServiceSupport.dispose(getControllingService());
                 }
@@ -683,6 +694,7 @@ public abstract class DemandForwardingBr
             // serialise with removeDestination operations so that removeSubs are serialised
with removeDestinations
             // such that all removeSub advisories are generated
             serialExecutor.execute(new Runnable() {
+                @Override
                 public void run() {
                     sub.waitForCompletion();
                     try {
@@ -760,6 +772,7 @@ public abstract class DemandForwardingBr
                             // broker when we get confirmation that the remote
                             // broker has received the message.
                             ResponseCallback callback = new ResponseCallback() {
+                                @Override
                                 public void onCompletion(FutureResponse future) {
                                     try {
                                         Response response = future.getResult();
@@ -1184,6 +1197,9 @@ public abstract class DemandForwardingBr
         ConsumerInfo info = new ConsumerInfo();
         info.setDestination(destination);
 
+        // Indicate that this subscription is being made on behalf of the remote broker.
+        info.setBrokerPath(new BrokerId[] { remoteBrokerId });
+
         // the remote info held by the DemandSubscription holds the original consumerId,
         // the local info get's overwritten
         info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
@@ -1307,6 +1323,7 @@ public abstract class DemandForwardingBr
         return remoteBrokerPath;
     }
 
+    @Override
     public void setNetworkBridgeListener(NetworkBridgeListener listener) {
         this.networkBridgeListener = listener;
     }
@@ -1318,26 +1335,32 @@ public abstract class DemandForwardingBr
         }
     }
 
+    @Override
     public String getRemoteAddress() {
         return remoteBroker.getRemoteAddress();
     }
 
+    @Override
     public String getLocalAddress() {
         return localBroker.getRemoteAddress();
     }
 
+    @Override
     public String getRemoteBrokerName() {
         return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
     }
 
+    @Override
     public String getLocalBrokerName() {
         return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
     }
 
+    @Override
     public long getDequeueCounter() {
         return dequeueCounter.get();
     }
 
+    @Override
     public long getEnqueueCounter() {
         return enqueueCounter.get();
     }
@@ -1350,16 +1373,19 @@ public abstract class DemandForwardingBr
         return subscriptionMapByRemoteId;
     }
 
+    @Override
     public void setBrokerService(BrokerService brokerService) {
         this.brokerService = brokerService;
         this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
         localBrokerPath[0] = localBrokerId;
     }
 
+    @Override
     public void setMbeanObjectName(ObjectName objectName) {
         this.mbeanObjectName = objectName;
     }
 
+    @Override
     public ObjectName getMbeanObjectName() {
         return mbeanObjectName;
     }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java?rev=1438123&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java Thu
Jan 24 18:39:09 2013
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.network.DemandForwardingBridgeSupport;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.Wait;
+import org.junit.Assert;
+
+/**
+ * This test demonstrates a bug in {@link DemandForwardingBridgeSupport} whereby
+ * a static subscription from broker1 to broker2 is forwarded to broker3 even
+ * though the network TTL is 1. This results in duplicate subscriptions on
+ * broker3.
+ */
+public class AMQ4148Test extends JmsMultipleBrokersTestSupport {
+
+    public void test() throws Exception {
+        // Create a hub-and-spoke network where each hub-spoke pair share
+        // messages on a test queue.
+        BrokerService hub = createBroker(new URI("broker:(vm://hub)/hub?persistent=false"));
+
+        final BrokerService[] spokes = new BrokerService[4];
+        for (int i = 0; i < spokes.length; i++) {
+            spokes[i] = createBroker(new URI("broker:(vm://spoke" + i + ")/spoke" + i + "?persistent=false"));
+
+        }
+        startAllBrokers();
+
+        ActiveMQDestination testQueue = createDestination(AMQ4148Test.class.getSimpleName()
+ ".queue", false);
+
+        NetworkConnector[] ncs = new NetworkConnector[spokes.length];
+        for (int i = 0; i < spokes.length; i++) {
+            NetworkConnector nc = bridgeBrokers("hub", "spoke" + i);
+            nc.setNetworkTTL(1);
+            nc.setDuplex(true);
+            nc.setConduitSubscriptions(false);
+            nc.setStaticallyIncludedDestinations(Arrays.asList(testQueue));
+            nc.start();
+
+            ncs[i] = nc;
+        }
+
+        waitForBridgeFormation();
+
+        // Pause to allow subscriptions to be created.
+        TimeUnit.SECONDS.sleep(5);
+
+        // Verify that the hub has a subscription from each spoke, but that each
+        // spoke has a single subscription from the hub (since the network TTL is 1).
+        final Destination hubTestQueue = hub.getDestination(testQueue);
+        assertTrue("Expecting {" + spokes.length + "} consumer but was {" + hubTestQueue.getConsumers().size()
+ "}",
+            Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return spokes.length == hubTestQueue.getConsumers().size();
+                }
+            })
+        );
+
+        // Now check each spoke has exactly one consumer on the Queue.
+        for (int i = 0; i < 4; i++) {
+            Destination spokeTestQueue = spokes[i].getDestination(testQueue);
+            Assert.assertEquals(1, spokeTestQueue.getConsumers().size());
+        }
+
+        for (NetworkConnector nc : ncs) {
+            nc.stop();
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message