activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1185765 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/network/ test/java/org/apache/activemq/network/
Date Tue, 18 Oct 2011 17:31:27 GMT
Author: gtully
Date: Tue Oct 18 17:31:27 2011
New Revision: 1185765

URL: http://svn.apache.org/viewvc?rev=1185765&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3542 - further simplification of network bridge
start, remove latch wait states, add duplex test variant

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1185765&r1=1185764&r2=1185765&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Tue Oct 18 17:31:27 2011
@@ -534,7 +534,7 @@ public class BrokerService implements Se
             getBroker().brokerServiceStarted();
             startedLatch.countDown();
         } catch (Exception e) {
-            LOG.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e);
+            LOG.error("Failed to start ActiveMQ JMS Message Broker (" + getBrokerName() +
", " + brokerId + "). Reason: " + e, e);
             try {
                 if (!stopped.get()) {
                     stop();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java?rev=1185765&r1=1185764&r2=1185765&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
Tue Oct 18 17:31:27 2011
@@ -18,14 +18,8 @@ package org.apache.activemq.network;
 
 import java.io.IOException;
 
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Endpoint;
-import org.apache.activemq.command.NetworkBridgeFilter;
 import org.apache.activemq.transport.Transport;
-import org.apache.activemq.util.ServiceSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,71 +34,13 @@ import org.slf4j.LoggerFactory;
 public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSupport {
     private static final Logger LOG = LoggerFactory.getLogger(CompositeDemandForwardingBridge.class);
 
-    protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
-    protected Object brokerInfoMutex = new Object();
-
     public CompositeDemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport
localBroker,
                                            Transport remoteBroker) {
         super(configuration, localBroker, remoteBroker);
         remoteBrokerName = remoteBroker.toString();
-        remoteBrokerNameKnownLatch.countDown();
-    }
-
-    protected void serviceRemoteBrokerInfo(Command command) throws IOException {
-        synchronized (brokerInfoMutex) {
-            BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
-            BrokerId remoteBrokerId = remoteBrokerInfo.getBrokerId();
-
-            // lets associate the incoming endpoint with a broker ID so we can
-            // refer to it later
-            Endpoint from = command.getFrom();
-            if (from == null) {
-                LOG.warn("Incoming command does not have a from endpoint: " + command);
-            } else {
-                from.setBrokerInfo(remoteBrokerInfo);
-            }
-            if (localBrokerId != null) {
-                if (localBrokerId.equals(remoteBrokerId)) {
-                    LOG.info("Disconnecting loop back connection.");
-                    // waitStarted();
-                    ServiceSupport.dispose(this);
-                }
-            }
-            if (!disposed.get()) {
-                triggerLocalStartBridge();
-            }
-        }
-    }
-
-    protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
-        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getFromBrokerId(info)));
-    }
-
-    /**
-     * Returns the broker ID that the command came from
-     */
-    protected BrokerId getFromBrokerId(Command command) throws IOException {
-        BrokerId answer = null;
-        Endpoint from = command.getFrom();
-        if (from == null) {
-            LOG.warn("Incoming command does not have a from endpoint: " + command);
-        } else {
-            answer = from.getBrokerId();
-        }
-        if (answer != null) {
-            return answer;
-        } else {
-            throw new IOException("No broker ID is available for endpoint: " + from + " from
command: "
-                                  + command);
-        }
     }
 
     protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
         // TODO is there much we can do here?
     }
-
-    protected BrokerId[] getRemoteBrokerPath() {
-        return remoteBrokerPath;
-    }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=1185765&r1=1185764&r2=1185765&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Tue Oct 18 17:31:27 2011
@@ -16,15 +16,7 @@
  */
 package org.apache.activemq.network;
 
-import java.io.IOException;
-
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.NetworkBridgeFilter;
 import org.apache.activemq.transport.Transport;
-import org.apache.activemq.util.ServiceSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,58 +30,8 @@ import org.slf4j.LoggerFactory;
 public class DemandForwardingBridge extends DemandForwardingBridgeSupport {
     private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridge.class);
 
-    protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
-    protected Object brokerInfoMutex = new Object();
-    protected BrokerId remoteBrokerId;
-
     public DemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
                                   Transport remoteBroker) {
         super(configuration, localBroker, remoteBroker);
     }
-
-    protected void serviceRemoteBrokerInfo(Command command) throws IOException {
-        synchronized (brokerInfoMutex) {
-            BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
-            remoteBrokerId = remoteBrokerInfo.getBrokerId();
-            remoteBrokerPath[0] = remoteBrokerId;
-            remoteBrokerName = remoteBrokerInfo.getBrokerName();
-            if (localBrokerId != null) {
-                if (localBrokerId.equals(remoteBrokerId)) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace(configuration.getBrokerName() + " disconnecting remote
loop back connection: " + remoteBrokerName);
-                    }
-                    ServiceSupport.dispose(this);
-                }
-            }
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("counting down remoteBrokerNameKnownLatch with: " + command);
-            }
-            remoteBrokerNameKnownLatch.countDown();
-        }
-    }
-
-    protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) {
-        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
-    }
-
-    protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
-        synchronized (brokerInfoMutex) {
-            localBrokerId = ((BrokerInfo)command).getBrokerId();
-            localBrokerPath[0] = localBrokerId;
-            localBrokerIdKnownLatch.countDown();
-            if (remoteBrokerId != null) {
-                if (remoteBrokerId.equals(localBrokerId)) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace(configuration.getBrokerName() + " disconnecting local loop
back connection.");
-                    }
-                    waitStarted();
-                    ServiceSupport.dispose(this);
-                }
-            }
-        }
-    }
-
-    protected BrokerId[] getRemoteBrokerPath() {
-        return remoteBrokerPath;
-    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1185765&r1=1185764&r2=1185765&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Tue Oct 18 17:31:27 2011
@@ -36,7 +36,6 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.TopicSubscription;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -74,7 +73,6 @@ import org.apache.activemq.transport.Res
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.transport.TransportFilter;
-import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.transport.tcp.SslTransport;
 import org.apache.activemq.util.*;
 import org.slf4j.Logger;
@@ -114,12 +112,14 @@ public abstract class DemandForwardingBr
     protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
     protected CountDownLatch startedLatch = new CountDownLatch(2);
     protected CountDownLatch localStartedLatch = new CountDownLatch(1);
-    protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1);
-    protected CountDownLatch localBrokerIdKnownLatch = new CountDownLatch(1);
     protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
     protected NetworkBridgeConfiguration configuration;
     protected NetworkBridgeFilterFactory filterFactory;
 
+    protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
+    protected Object brokerInfoMutex = new Object();
+    protected BrokerId remoteBrokerId;
+
     final AtomicLong enqueueCounter = new AtomicLong();
     final AtomicLong dequeueCounter = new AtomicLong();
 
@@ -222,14 +222,12 @@ public abstract class DemandForwardingBr
         });
     }
 
-    protected void startLocalBridge() throws Throwable {
+    private void startLocalBridge() throws Throwable {
         if (localBridgeStarted.compareAndSet(false, true)) {
             synchronized (this) {
                 if (LOG.isTraceEnabled()) {
                     LOG.trace(configuration.getBrokerName() + " starting local Bridge, localBroker="
+ localBroker);
                 }
-                remoteBrokerNameKnownLatch.await();
-
                 if (!disposed.get()) {
                     localConnectionInfo = new ConnectionInfo();
                     localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
@@ -277,7 +275,7 @@ public abstract class DemandForwardingBr
     protected void startRemoteBridge() throws Exception {
         if (remoteBridgeStarted.compareAndSet(false, true)) {
             if (LOG.isTraceEnabled()) {
-                LOG.trace(configuration.getBrokerName() + " starting remote Bridge, localBroker="
+ localBroker);
+                LOG.trace(configuration.getBrokerName() + " starting remote Bridge, remoteBroker="
+ remoteBroker);
             }
             synchronized (this) {
                 if (!isCreatedByDuplex()) {
@@ -291,7 +289,6 @@ public abstract class DemandForwardingBr
                     IntrospectionSupport.getProperties(configuration, props, null);
                     String str = MarshallingSupport.propertiesToString(props);
                     brokerInfo.setNetworkProperties(str);
-                    localBrokerIdKnownLatch.await();
                     brokerInfo.setBrokerId(this.localBrokerId);
                     remoteBroker.oneway(brokerInfo);
                 }
@@ -322,9 +319,6 @@ public abstract class DemandForwardingBr
                 demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
                 remoteBroker.oneway(demandConsumerInfo);
                 startedLatch.countDown();
-                if (!disposed.get()) {
-                    triggerLocalStartBridge();
-                }
             }
         }
     }
@@ -372,7 +366,6 @@ public abstract class DemandForwardingBr
             brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
             brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
             LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + "
stopped");
-            remoteBrokerNameKnownLatch.countDown();
         }
     }
 
@@ -1161,7 +1154,6 @@ public abstract class DemandForwardingBr
 
     protected void waitStarted() throws InterruptedException {
         startedLatch.await();
-        localBrokerIdKnownLatch.await();
     }
 
     protected void clearDownSubscriptions() {
@@ -1184,13 +1176,47 @@ public abstract class DemandForwardingBr
          return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL()
);
     }
 
-    protected abstract void serviceLocalBrokerInfo(Command command) throws InterruptedException;
+    protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
+        synchronized (brokerInfoMutex) {
+            if (remoteBrokerId != null) {
+                if (remoteBrokerId.equals(localBrokerId)) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace(configuration.getBrokerName() + " disconnecting local loop
back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
+                    }
+                    waitStarted();
+                    ServiceSupport.dispose(this);
+                }
+            }
+        }
+    }
 
-    protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException;
+    protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
+        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
+    }
 
-    protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException;
+    protected void serviceRemoteBrokerInfo(Command command) throws IOException {
+        synchronized (brokerInfoMutex) {
+            BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
+            remoteBrokerId = remoteBrokerInfo.getBrokerId();
+            remoteBrokerPath[0] = remoteBrokerId;
+            remoteBrokerName = remoteBrokerInfo.getBrokerName();
+            if (localBrokerId != null) {
+                if (localBrokerId.equals(remoteBrokerId)) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace(configuration.getBrokerName() + " disconnecting remote
loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
+                    }
+                    ServiceSupport.dispose(this);
+                }
+            }
+            if (!disposed.get()) {
+                triggerLocalStartBridge();
+            }
+        }
+    }
 
-    protected abstract BrokerId[] getRemoteBrokerPath();
+    protected  BrokerId[] getRemoteBrokerPath() {
+        return remoteBrokerPath;
+    }
 
     public void setNetworkBridgeListener(NetworkBridgeListener listener) {
         this.networkBridgeListener = listener;
@@ -1233,6 +1259,8 @@ public abstract class DemandForwardingBr
 
     public void setBrokerService(BrokerService brokerService) {
         this.brokerService = brokerService;
+        this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
+        localBrokerPath[0] = localBrokerId;
     }
 
     public void setMbeanObjectName(ObjectName objectName) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=1185765&r1=1185764&r2=1185765&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
Tue Oct 18 17:31:27 2011
@@ -96,10 +96,16 @@ public class DiscoveryNetworkConnector e
                     return;
                 }
             }
-            if ( localURI.equals(uri) || (connectionFilter != null && !connectionFilter.connectTo(uri)))
{
+            if (localURI.equals(uri)) {
                 LOG.debug("not connecting loopback: " + uri);
                 return;
             }
+
+            if (connectionFilter != null && !connectionFilter.connectTo(uri)) {
+                LOG.debug("connectionFilter disallows connection to: " + uri);
+                return;
+            }
+
             URI connectUri = uri;
             try {
                 connectUri = URISupport.applyParameters(connectUri, parameters, DISCOVERED_OPTION_PREFIX);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java?rev=1185765&r1=1185764&r2=1185765&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
Tue Oct 18 17:31:27 2011
@@ -278,7 +278,6 @@ public class FailoverStaticNetworkTest {
         brokerC.start();
         assertTrue("all props applied a second time", networkConnectorProps.isEmpty());
 
-        //Thread.sleep(4000);
         doTestNetworkSendReceive(brokerC, brokerB);
         doTestNetworkSendReceive(brokerB, brokerC);
 
@@ -321,8 +320,20 @@ public class FailoverStaticNetworkTest {
 
     @Test
     public void testRepeatedSendReceiveWithMasterSlaveAlternate() throws Exception {
+        doTestRepeatedSendReceiveWithMasterSlaveAlternate(null);
+    }
+
+    @Test
+    public void testRepeatedSendReceiveWithMasterSlaveAlternateDuplex() throws Exception
{
+        HashMap<String, String> networkConnectorProps = new HashMap<String, String>();
+        networkConnectorProps.put("duplex", "true");
+
+        doTestRepeatedSendReceiveWithMasterSlaveAlternate(networkConnectorProps);
+    }
+
+    public void doTestRepeatedSendReceiveWithMasterSlaveAlternate(HashMap<String, String>
networkConnectorProps) throws Exception {
 
-        brokerB = createBroker("tcp", "62617", new String[]{"61610","61611"});
+        brokerB = createBroker("tcp", "62617", new String[]{"61610","61611"}, networkConnectorProps);
         brokerB.start();
 
         final AtomicBoolean done = new AtomicBoolean(false);
@@ -380,7 +391,7 @@ public class FailoverStaticNetworkTest {
             }
         });
 
-        for (int i=0; i<10; i++) {
+        for (int i=0; i<4; i++) {
             BrokerService currentMaster =  (i%2 == 0 ? brokerA : brokerA1);
             LOG.info("iteration: " + i + ", using: " + currentMaster.getBrokerObjectName().getKeyProperty("BrokerName"));
             currentMaster.waitUntilStarted();
@@ -392,7 +403,7 @@ public class FailoverStaticNetworkTest {
             currentMaster.waitUntilStopped();
         }
 
-        done.set(false);
+        done.set(true);
         LOG.info("all done");
         executorService.shutdownNow();
     }



Mime
View raw message