activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1497716 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/ activemq-b...
Date Fri, 28 Jun 2013 10:32:40 GMT
Author: gtully
Date: Fri Jun 28 10:32:40 2013
New Revision: 1497716

URL: http://svn.apache.org/r1497716
Log:
https://issues.apache.org/jira/browse/AMQ-4607 - add network connector consumerTTL and messageTTL - split effect of networkTTL - allows a message many hops in a mesh while consumer demand is not repeatildy replicated. Rollback cursor aduit on forward so a message can be redispatched on redelivery. Additional test to verify multiple hops back to origin. Allow infinite ttl or hops with -1 https://issues.apache.org/jira/browse/AMQ-2180.

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java   (with props)
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java   (with props)
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java
    activemq/trunk/activemq-client/pom.xml
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/openwire/v10/NetworkBridgeFilterMarshaller.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/DataFileGeneratorTestSupport.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java Fri Jun 28 10:32:40 2013
@@ -38,8 +38,12 @@ public class NetworkConnectorView implem
         return connector.getName();
     }
 
-    public int getNetworkTTL() {
-        return connector.getNetworkTTL();
+    public int getMessageTTL() {
+        return connector.getMessageTTL();
+    }
+
+    public int getConsumerTTL() {
+        return connector.getConsumerTTL();
     }
 
     public int getPrefetchSize() {
@@ -98,8 +102,12 @@ public class NetworkConnectorView implem
         connector.setDynamicOnly(dynamicOnly);
     }
 
-    public void setNetworkTTL(int networkTTL) {
-        connector.setNetworkTTL(networkTTL);
+    public void setMessageTTL(int messageTTL) {
+        connector.setMessageTTL(messageTTL);
+    }
+
+    public void setConsumerTTL(int consumerTTL) {
+        connector.setConsumerTTL(consumerTTL);
     }
 
     public void setPassword(String password) {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java Fri Jun 28 10:32:40 2013
@@ -22,7 +22,9 @@ public interface NetworkConnectorViewMBe
 
     String getName();
 
-    int getNetworkTTL();
+    int getMessageTTL();
+
+    int getConsumerTTL();
 
     int getPrefetchSize();
 
@@ -52,7 +54,9 @@ public interface NetworkConnectorViewMBe
 
     void setDynamicOnly(boolean dynamicOnly);
 
-    void setNetworkTTL(int networkTTL);
+    void setMessageTTL(int messageTTL);
+
+    void setConsumerTTL(int consumerTTL);
 
     void setPassword(String password);
 

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Jun 28 10:32:40 2013
@@ -1790,7 +1790,7 @@ public class Queue extends BaseDestinati
                 });
             }
         }
-        if (ack.isPoisonAck()) {
+        if (ack.isPoisonAck() || (sub != null && sub.getConsumerInfo().isNetworkSubscription())) {
             // message gone to DLQ, is ok to allow redelivery
             messagesLock.writeLock().lock();
             try{

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java Fri Jun 28 10:32:40 2013
@@ -107,7 +107,7 @@ public class AbortSlowConsumerStrategy i
                 try {
                     LOG.info("aborting "
                             + (abortSubscriberConnection ? "connection" : "consumer") 
-                            + ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId());
+                            + ", slow consumer: " + entry.getKey());
 
                     final Connection connection = connectionContext.getConnection();
                     if (connection != null) {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java Fri Jun 28 10:32:40 2013
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.network;
 
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.activemq.broker.region.Destination;
@@ -31,7 +32,7 @@ import org.slf4j.LoggerFactory;
 /**
  * implement conditional behavior for queue consumers, allows replaying back to
  * origin if no consumers are present on the local broker after a configurable
- * delay, irrespective of the networkTTL Also allows rate limiting of messages
+ * delay, irrespective of the TTL. Also allows rate limiting of messages
  * through the network, useful for static includes
  *
  * @org.apache.xbean.XBean
@@ -44,10 +45,11 @@ public class ConditionalNetworkBridgeFil
     int rateDuration = 1000;
 
     @Override
-    public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive) {
+    public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int messageTTL, int consumerTTL) {
         ConditionalNetworkBridgeFilter filter = new ConditionalNetworkBridgeFilter();
         filter.setNetworkBrokerId(remoteBrokerPath[0]);
-        filter.setNetworkTTL(networkTimeToLive);
+        filter.setMessageTTL(messageTTL);
+        filter.setConsumerTTL(consumerTTL);
         filter.setAllowReplayWhenNoConsumers(isReplayWhenNoConsumers());
         filter.setRateLimit(getRateLimit());
         filter.setRateDuration(getRateDuration());
@@ -104,9 +106,15 @@ public class ConditionalNetworkBridgeFil
                 // potential replay back to origin
                 match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
 
-                if (match && LOG.isTraceEnabled()) {
-                    LOG.trace("Replaying  [" + message.getMessageId() + "] for [" + message.getDestination()
-                            + "] back to origin in the absence of a local consumer");
+                if (LOG.isTraceEnabled()) {
+                    if (match) {
+                        LOG.trace("Replaying [" + message.getMessageId() + "] for [" + message.getDestination()
+                                + "] back to origin in the absence of a local consumer");
+                    } else {
+                        LOG.trace("Suppressing replay of [" + message.getMessageId() + "] for [" + message.getDestination()
+                                + "] back to origin " + Arrays.asList(message.getBrokerPath()));
+
+                    }
                 }
 
             } else {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java Fri Jun 28 10:32:40 2013
@@ -80,8 +80,7 @@ public class ConduitBridge extends Deman
             DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination());
             if (filter.matches(info.getDestination())) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " +
-                              ds.getRemoteInfo() + " with sub: " + info.getConsumerId());
+                    LOG.debug(configuration.getBrokerName() + " " + info + " with ids" + info.getNetworkConsumerIds() + " matched (add interest) " + ds);
                 }
                 // add the interest in the subscription
                 if (checkPaths(info.getBrokerPath(), ds.getRemoteInfo().getBrokerPath())) {
@@ -105,7 +104,7 @@ public class ConduitBridge extends Deman
         for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
             if (ds.remove(id)) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(configuration.getBrokerName() + " removing interest in sub on " + localBroker + " from " + remoteBrokerName + " : sub: " + id  + " existing matched sub: " + ds.getRemoteInfo());
+                    LOG.debug(configuration.getBrokerName() + " on " + localBroker + " from " + remoteBrokerName + " removed interest for: " + id  + " from " + ds);
                 }
             }
             if (ds.isEmpty()) {
@@ -116,7 +115,7 @@ public class ConduitBridge extends Deman
         for (DemandSubscription ds : tmpList) {
             removeSubscription(ds);
             if (LOG.isDebugEnabled()) {
-                LOG.debug(configuration.getBrokerName() + " removing sub on " + localBroker + " from " + remoteBrokerName + " :  " + ds.getRemoteInfo());
+                LOG.debug(configuration.getBrokerName() + " on " + localBroker + " from " + remoteBrokerName + " removed " + ds);
             }
         }
     }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java Fri Jun 28 10:32:40 2013
@@ -27,7 +27,7 @@ import org.apache.activemq.command.Netwo
  *  @org.apache.xbean.XBean
  */
 public class DefaultNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory {
-    public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive) {
-        return new NetworkBridgeFilter(info, remoteBrokerPath[0], networkTimeToLive);
+    public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int messageTTL, int consumerTTL) {
+        return new NetworkBridgeFilter(info, remoteBrokerPath[0], messageTTL, consumerTTL);
     }
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Jun 28 10:32:40 2013
@@ -503,6 +503,7 @@ public abstract class DemandForwardingBr
                     // set our properties
                     Properties props = new Properties();
                     IntrospectionSupport.getProperties(configuration, props, null);
+                    props.remove("networkTTL");
                     String str = MarshallingSupport.propertiesToString(props);
                     brokerInfo.setNetworkProperties(str);
                     brokerInfo.setBrokerId(this.localBrokerId);
@@ -634,15 +635,7 @@ public abstract class DemandForwardingBr
                                 case ConsumerInfo.DATA_STRUCTURE_TYPE:
                                     localStartedLatch.await();
                                     if (started.get()) {
-                                        if (!addConsumerInfo((ConsumerInfo) command)) {
-                                            if (LOG.isDebugEnabled()) {
-                                                LOG.debug("Ignoring ConsumerInfo: " + command);
-                                            }
-                                        } else {
-                                            if (LOG.isTraceEnabled()) {
-                                                LOG.trace("Adding ConsumerInfo: " + command);
-                                            }
-                                        }
+                                        addConsumerInfo((ConsumerInfo) command);
                                     } else {
                                         // received a subscription whilst stopping
                                         LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
@@ -691,7 +684,7 @@ public abstract class DemandForwardingBr
     }
 
     private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
-        final int networkTTL = configuration.getNetworkTTL();
+        final int networkTTL = configuration.getConsumerTTL();
         if (data.getClass() == ConsumerInfo.class) {
             // Create a new local subscription
             ConsumerInfo info = (ConsumerInfo) data;
@@ -704,7 +697,7 @@ public abstract class DemandForwardingBr
                 return;
             }
 
-            if (path != null && path.length >= networkTTL) {
+            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL
                         + " network hops only : " + info);
@@ -732,29 +725,19 @@ public abstract class DemandForwardingBr
             // in a cyclic network there can be multiple bridges per broker that can propagate
             // a network subscription so there is a need to synchronize on a shared entity
             synchronized (brokerService.getVmConnectorURI()) {
-                if (addConsumerInfo(info)) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(configuration.getBrokerName() + " bridged sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
-                    }
-                } else {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName
-                            + " as already subscribed to matching destination : " + info);
-                    }
-                }
+                addConsumerInfo(info);
             }
         } else if (data.getClass() == DestinationInfo.class) {
             // It's a destination info - we want to pass up information about temporary destinations
             final DestinationInfo destInfo = (DestinationInfo) data;
             BrokerId[] path = destInfo.getBrokerPath();
-            if (path != null && path.length >= networkTTL) {
+            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " restricted to " + networkTTL + " network hops only");
                 }
                 return;
             }
             if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
-                // Ignore this consumer as it's a consumer we locally sent to the broker.
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once");
                 }
@@ -958,7 +941,7 @@ public abstract class DemandForwardingBr
                         if (suppressMessageDispatch(md, sub)) {
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName
-                                    + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath())
+                                    + " because message came from there or fails TTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath())
                                     + ", message: " + md.getMessage());
                             }
                             // still ack as it may be durable
@@ -1165,8 +1148,7 @@ public abstract class DemandForwardingBr
         }
     }
 
-    protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
-        boolean consumerAdded = false;
+    protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
         ConsumerInfo info = consumerInfo.copy();
         addRemoteBrokerToBrokerPath(info);
         DemandSubscription sub = createDemandSubscription(info);
@@ -1178,10 +1160,9 @@ public abstract class DemandForwardingBr
                     sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
                 }
                 addSubscription(sub);
-                consumerAdded = true;
+                LOG.debug(configuration.getBrokerName() + " new demand subscription: " + sub);
             }
         }
-        return consumerAdded;
     }
 
     private void undoMapRegistration(DemandSubscription sub) {
@@ -1421,7 +1402,7 @@ public abstract class DemandForwardingBr
                 filterFactory = entry.getNetworkBridgeFilterFactory();
             }
         }
-        return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL());
+        return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL());
     }
 
     protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java Fri Jun 28 10:32:40 2013
@@ -52,6 +52,11 @@ public class DemandSubscription {
         remoteSubsIds.add(info.getConsumerId());
     }
 
+    @Override
+    public String toString() {
+        return "DemandSub{" + localInfo.getConsumerId() + ",remotes:" + remoteSubsIds + "}";
+    }
+
     /**
      * Increment the consumers associated with this subscription
      *

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java Fri Jun 28 10:32:40 2013
@@ -32,6 +32,9 @@ import org.slf4j.LoggerFactory;
 public class DurableConduitBridge extends ConduitBridge {
     private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class);
 
+    public String toString() {
+        return "DurableConduitBridge";
+    }
     /**
      * Constructor
      *

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java Fri Jun 28 10:32:40 2013
@@ -302,6 +302,8 @@ public class LdapNetworkConnector extend
         connector.setDynamicOnly(isDynamicOnly());
         connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
         connector.setNetworkTTL(getNetworkTTL());
+        connector.setConsumerTTL(getConsumerTTL());
+        connector.setMessageTTL(getMessageTTL());
         connector.setConduitSubscriptions(isConduitSubscriptions());
         connector.setExcludedDestinations(getExcludedDestinations());
         connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations());

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java Fri Jun 28 10:32:40 2013
@@ -40,6 +40,9 @@ public class NetworkBridgeConfiguration 
     private boolean bridgeTempDestinations = true;
     private int prefetchSize = 1000;
     private int networkTTL = 1;
+    private int consumerTTL = networkTTL;
+    private int messageTTL = networkTTL;
+
     private String brokerName = "localhost";
     private String brokerURL = "";
     private String userName;
@@ -170,6 +173,8 @@ public class NetworkBridgeConfiguration 
      */
     public void setNetworkTTL(int networkTTL) {
         this.networkTTL = networkTTL;
+        setConsumerTTL(networkTTL);
+        setMessageTTL(networkTTL);
     }
 
     /**
@@ -394,4 +399,20 @@ public class NetworkBridgeConfiguration 
     public void setAdvisoryForFailedForward(boolean advisoryForFailedForward) {
         this.advisoryForFailedForward = advisoryForFailedForward;
     }
+
+    public void setConsumerTTL(int consumerTTL) {
+        this.consumerTTL = consumerTTL;
+    }
+
+    public int getConsumerTTL() {
+        return  consumerTTL;
+    }
+
+    public void setMessageTTL(int messageTTL) {
+        this.messageTTL = messageTTL;
+    }
+
+    public int getMessageTTL() {
+        return messageTTL;
+    }
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFilterFactory.java Fri Jun 28 10:32:40 2013
@@ -24,5 +24,5 @@ import org.apache.activemq.command.Netwo
 public interface NetworkBridgeFilterFactory {
     // create a dispatch filter for network consumers, default impl will not send a message back to
     // its origin to prevent looping, the down side is that messages can get stuck
-    NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive);
+    NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int messageTTL, int consumerTTL);
 }

Modified: activemq/trunk/activemq-client/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/pom.xml?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-client/pom.xml (original)
+++ activemq/trunk/activemq-client/pom.xml Fri Jun 28 10:32:40 2013
@@ -276,13 +276,13 @@
               </tasks>
             </configuration>
             <dependencies>
-              <dependency>
+              <!-- not needed on osx; dependency>
                 <groupId>com.sun</groupId>
                 <artifactId>tools</artifactId>
                 <version>1.6.0</version>
                 <scope>system</scope>
                 <systemPath>${java.home}/../lib/tools.jar</systemPath>
-              </dependency>
+              </dependency -->
             </dependencies>
           </plugin>
         </plugins>

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java Fri Jun 28 10:32:40 2013
@@ -2033,7 +2033,7 @@ public class ActiveMQSession implements 
                 } catch (JMSException e) {
                     LOG.warn("Exception closing consumer", e);
                 }
-                LOG.warn("Closed consumer on Command");
+                LOG.warn("Closed consumer on Command, " + id);
                 break;
             }
         }

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java Fri Jun 28 10:32:40 2013
@@ -28,7 +28,7 @@ import java.util.Arrays;
 
 /**
  * @openwire:marshaller code="91"
- * 
+ *
  */
 public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
 
@@ -36,15 +36,17 @@ public class NetworkBridgeFilter impleme
     static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class);
 
     protected BrokerId networkBrokerId;
-    protected int networkTTL;
+    protected int messageTTL;
+    protected int consumerTTL;
     transient ConsumerInfo consumerInfo;
 
     public NetworkBridgeFilter() {
     }
 
-    public NetworkBridgeFilter(ConsumerInfo consumerInfo, BrokerId networkBrokerId, int networkTTL) {
+    public NetworkBridgeFilter(ConsumerInfo consumerInfo, BrokerId networkBrokerId, int messageTTL, int consumerTTL) {
         this.networkBrokerId = networkBrokerId;
-        this.networkTTL = networkTTL;
+        this.messageTTL = messageTTL;
+        this.consumerTTL = consumerTTL;
         this.consumerInfo = consumerInfo;
     }
 
@@ -86,9 +88,9 @@ public class NetworkBridgeFilter impleme
 
         int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
 
-        if (hops >= networkTTL) {
+        if (messageTTL > -1 && hops >= messageTTL) {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Message restricted to " + networkTTL + " network hops ignoring: " + message);
+                LOG.trace("Message restricted to " + messageTTL + " network hops ignoring: " + message);
             }
             return false;
         }
@@ -103,9 +105,9 @@ public class NetworkBridgeFilter impleme
             } else if ( message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
                 ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
                 hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
-                if (hops >= networkTTL) {
+                if (consumerTTL > -1 && hops >= consumerTTL) {
                     if (LOG.isTraceEnabled()) {
-                        LOG.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message);
+                        LOG.trace("ConsumerInfo advisory restricted to " + consumerTTL + " network hops ignoring: " + message);
                     }
                     return false;
                 }
@@ -132,15 +134,15 @@ public class NetworkBridgeFilter impleme
         return false;
     }
 
-    /**
-     * @openwire:property version=1
-     */
+    // keep for backward compat with older
+    // wire formats
     public int getNetworkTTL() {
-        return networkTTL;
+        return messageTTL;
     }
 
     public void setNetworkTTL(int networkTTL) {
-        this.networkTTL = networkTTL;
+        messageTTL = networkTTL;
+        consumerTTL = networkTTL;
     }
 
     /**
@@ -154,4 +156,25 @@ public class NetworkBridgeFilter impleme
         this.networkBrokerId = remoteBrokerPath;
     }
 
+    public void setMessageTTL(int messageTTL) {
+        this.messageTTL = messageTTL;
+    }
+
+    /**
+     * @openwire:property version=10
+     */
+    public int getMessageTTL() {
+        return this.messageTTL;
+    }
+
+    public void setConsumerTTL(int consumerTTL) {
+        this.consumerTTL = consumerTTL;
+    }
+
+    /**
+     * @openwire:property version=10
+     */
+    public int getConsumerTTL() {
+        return this.consumerTTL;
+    }
 }

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/openwire/v10/NetworkBridgeFilterMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/openwire/v10/NetworkBridgeFilterMarshaller.java?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/openwire/v10/NetworkBridgeFilterMarshaller.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/openwire/v10/NetworkBridgeFilterMarshaller.java Fri Jun 28 10:32:40 2013
@@ -66,8 +66,9 @@ public class NetworkBridgeFilterMarshall
         super.tightUnmarshal(wireFormat, o, dataIn, bs);
 
         NetworkBridgeFilter info = (NetworkBridgeFilter)o;
-        info.setNetworkTTL(dataIn.readInt());
         info.setNetworkBrokerId((org.apache.activemq.command.BrokerId) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
+        info.setMessageTTL(dataIn.readInt());
+        info.setConsumerTTL(dataIn.readInt());
 
     }
 
@@ -82,7 +83,7 @@ public class NetworkBridgeFilterMarshall
         int rc = super.tightMarshal1(wireFormat, o, bs);
         rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getNetworkBrokerId(), bs);
 
-        return rc + 4;
+        return rc + 8;
     }
 
     /**
@@ -96,8 +97,9 @@ public class NetworkBridgeFilterMarshall
         super.tightMarshal2(wireFormat, o, dataOut, bs);
 
         NetworkBridgeFilter info = (NetworkBridgeFilter)o;
-        dataOut.writeInt(info.getNetworkTTL());
         tightMarshalCachedObject2(wireFormat, (DataStructure)info.getNetworkBrokerId(), dataOut, bs);
+        dataOut.writeInt(info.getMessageTTL());
+        dataOut.writeInt(info.getConsumerTTL());
 
     }
 
@@ -112,8 +114,9 @@ public class NetworkBridgeFilterMarshall
         super.looseUnmarshal(wireFormat, o, dataIn);
 
         NetworkBridgeFilter info = (NetworkBridgeFilter)o;
-        info.setNetworkTTL(dataIn.readInt());
         info.setNetworkBrokerId((org.apache.activemq.command.BrokerId) looseUnmarsalCachedObject(wireFormat, dataIn));
+        info.setMessageTTL(dataIn.readInt());
+        info.setConsumerTTL(dataIn.readInt());
 
     }
 
@@ -126,8 +129,9 @@ public class NetworkBridgeFilterMarshall
         NetworkBridgeFilter info = (NetworkBridgeFilter)o;
 
         super.looseMarshal(wireFormat, o, dataOut);
-        dataOut.writeInt(info.getNetworkTTL());
         looseMarshalCachedObject(wireFormat, (DataStructure)info.getNetworkBrokerId(), dataOut);
+        dataOut.writeInt(info.getMessageTTL());
+        dataOut.writeInt(info.getConsumerTTL());
 
     }
 }

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Fri Jun 28 10:32:40 2013
@@ -389,7 +389,7 @@ public class JmsMultipleBrokersTestSuppo
         }
         return null;
     }
-    
+
     protected void assertConsumersConnect(String brokerName, Destination destination, final int count, long timeout) throws Exception {
         BrokerItem brokerItem = brokers.get(brokerName);
         Connection conn = brokerItem.createConnection();
@@ -528,6 +528,7 @@ public class JmsMultipleBrokersTestSuppo
             this.broker = broker;
 
             factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
+            factory.setConnectionIDPrefix(broker.getBrokerName());
             consumers = Collections.synchronizedMap(new HashMap<MessageConsumer, MessageIdList>());
             connections = Collections.synchronizedList(new ArrayList<Connection>());
             allMessages.setVerbose(verbose);

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java?rev=1497716&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java Fri Jun 28 10:32:40 2013
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import junit.framework.Test;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4607Test extends JmsMultipleBrokersTestSupport implements UncaughtExceptionHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4607Test.class);
+
+    public static final int BROKER_COUNT = 3;
+    public static final int CONSUMER_COUNT = 1;
+    public static final int MESSAGE_COUNT = 0;
+    public static final boolean CONDUIT = true;
+    public static final int TIMEOUT = 20000;
+
+    public boolean duplex = true;
+    protected Map<String, MessageConsumer> consumerMap;
+    Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>();
+
+    private void assertNoUnhandeledExceptions() {
+        for( Entry<Thread, Throwable> e: unhandeledExceptions.entrySet()) {
+            LOG.error("Thread:" + e.getKey() + " Had unexpected: " + e.getValue());
+        }
+        assertTrue("There are no unhandelled exceptions, see: log for detail on: " + unhandeledExceptions,
+                unhandeledExceptions.isEmpty());
+    }
+
+    public NetworkConnector bridge(String from, String to) throws Exception {
+        NetworkConnector networkConnector = bridgeBrokers(from, to, true, -1, CONDUIT);
+        networkConnector.setSuppressDuplicateQueueSubscriptions(true);
+        networkConnector.setDecreaseNetworkConsumerPriority(true);
+        networkConnector.setConsumerTTL(1);
+        networkConnector.setDuplex(duplex);
+        return networkConnector;
+    }
+
+    public static Test suite() {
+        return suite(AMQ4607Test.class);
+    }
+
+    public void initCombos() {
+        addCombinationValues("duplex", new Boolean[]{Boolean.TRUE, Boolean.FALSE});
+    }
+
+    public void testMigratingConsumer() throws Exception {
+        bridge("Broker0", "Broker1");
+        if (!duplex) bridge("Broker1", "Broker0");
+
+        bridge("Broker1", "Broker2");
+        if (!duplex) bridge("Broker2", "Broker1");
+
+        bridge("Broker0", "Broker2");
+        if (!duplex) bridge("Broker2", "Broker0");
+
+        startAllBrokers();
+        this.waitForBridgeFormation();
+
+        Destination dest = createDestination("TEST.FOO", false);
+        sendMessages("Broker0", dest, 1);
+
+        for (int i=0; i< BROKER_COUNT; i++) {
+            MessageConsumer messageConsumer = createConsumer("Broker" + i, dest, "DoNotConsume = 'true'");
+
+            for (int J = 0; J < BROKER_COUNT; J++) {
+                assertExactConsumersConnect("Broker" + J, dest, CONSUMER_COUNT, TIMEOUT);
+            }
+
+            assertNoUnhandeledExceptions();
+
+            assertExactMessageCount("Broker" + i, dest, 1, TIMEOUT);
+
+            messageConsumer.close();
+            LOG.info("Check for no consumers..");
+            for (int J = 0; J < BROKER_COUNT; J++) {
+        	    assertExactConsumersConnect("Broker" + J, dest, 0, TIMEOUT);
+            }
+        }
+
+        // now consume the message
+        final String brokerId = "Broker2";
+        MessageConsumer messageConsumer = createConsumer(brokerId, dest);
+        assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokers.get(brokerId).allMessages.getMessageIds().size() == 1;
+            }
+        }));
+        messageConsumer.close();
+
+    }
+
+    public void testMigratingConsumerFullCircle() throws Exception {
+        bridge("Broker0", "Broker1");
+        if (!duplex) bridge("Broker1", "Broker0");
+
+        bridge("Broker1", "Broker2");
+        if (!duplex) bridge("Broker2", "Broker1");
+
+        bridge("Broker0", "Broker2");
+        if (!duplex) bridge("Broker2", "Broker0");
+
+        // allow full loop, immediate replay back to 0 from 2
+        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
+        conditionalNetworkBridgeFilterFactory.setReplayDelay(0);
+        conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
+        brokers.get("Broker2").broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
+        startAllBrokers();
+        this.waitForBridgeFormation();
+
+        Destination dest = createDestination("TEST.FOO", false);
+
+        sendMessages("Broker0", dest, 1);
+
+        for (int i=0; i< BROKER_COUNT; i++) {
+            MessageConsumer messageConsumer = createConsumer("Broker" + i, dest, "DoNotConsume = 'true'");
+
+            for (int J = 0; J < BROKER_COUNT; J++) {
+                assertExactConsumersConnect("Broker" + J, dest, CONSUMER_COUNT, TIMEOUT);
+            }
+
+            assertNoUnhandeledExceptions();
+
+            // validate the message has been forwarded
+            assertExactMessageCount("Broker" + i, dest, 1, TIMEOUT);
+
+            messageConsumer.close();
+            LOG.info("Check for no consumers..");
+            for (int J = 0; J < BROKER_COUNT; J++) {
+        	    assertExactConsumersConnect("Broker" + J, dest, 0, TIMEOUT);
+            }
+        }
+
+        // now consume the message from the origin
+        LOG.info("Consume from origin...");
+        final String brokerId = "Broker0";
+        MessageConsumer messageConsumer = createConsumer(brokerId, dest);
+        assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokers.get(brokerId).allMessages.getMessageIds().size() == 1;
+            }
+        }));
+        messageConsumer.close();
+
+    }
+
+    protected void assertExactMessageCount(final String brokerName, Destination destination, final int count, long timeout) throws Exception {
+        ManagementContext context = brokers.get(brokerName).broker.getManagementContext();
+        final QueueViewMBean queueViewMBean = (QueueViewMBean) context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
+        assertTrue("Excepected queue depth: " + count + " on: " + brokerName, Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                long currentCount = queueViewMBean.getQueueSize();
+                LOG.info("On " + brokerName + " current queue size for " + queueViewMBean + ", " + currentCount);
+                if (count != currentCount) {
+                    LOG.info("Sub IDs: " + Arrays.asList(queueViewMBean.getSubscriptions()));
+                }
+                return currentCount == count;
+            }
+        }, timeout));
+    }
+
+    protected void assertExactConsumersConnect(final String brokerName, Destination destination, final int count, long timeout) throws Exception {
+        final ManagementContext context = brokers.get(brokerName).broker.getManagementContext();
+        assertTrue("Excepected consumers count: " + count + " on: " + brokerName, Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                try {
+                    QueueViewMBean queueViewMBean = (QueueViewMBean) context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
+                    long currentCount = queueViewMBean.getConsumerCount();
+                    LOG.info("On " + brokerName + " current consumer count for " + queueViewMBean + ", " + currentCount);
+                    if (count != currentCount) {
+                        LOG.info("Sub IDs: " + Arrays.asList(queueViewMBean.getSubscriptions()));
+                    }
+                    return currentCount == count;
+                } catch (Exception e) {
+                    LOG.warn("Unexpected: " + e, e);
+                    return false;
+                }
+            }
+        }, timeout));
+    }
+
+    public void setUp() throws Exception {
+        super.setUp();
+
+        unhandeledExceptions.clear();
+        Thread.setDefaultUncaughtExceptionHandler(this);
+        
+        // Setup n brokers
+        for (int i = 0; i < BROKER_COUNT; i++) {
+            createBroker(new URI("broker:(tcp://localhost:6161" + i + ")/Broker" + i + "?persistent=false&useJmx=true"));
+        }
+
+        consumerMap = new LinkedHashMap<String, MessageConsumer>();
+    }
+
+    @Override
+    protected void configureBroker(BrokerService brokerService) {
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setExpireMessagesPeriod(0);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(policyEntry);
+        brokerService.setDestinationPolicy(policyMap);
+    }
+
+    public void uncaughtException(Thread t, Throwable e) {
+        synchronized(unhandeledExceptions) {
+            unhandeledExceptions.put(t,e);
+        }
+    }
+}

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/DataFileGeneratorTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/DataFileGeneratorTestSupport.java?rev=1497716&r1=1497715&r2=1497716&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/DataFileGeneratorTestSupport.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/DataFileGeneratorTestSupport.java Fri Jun 28 10:32:40 2013
@@ -324,7 +324,7 @@ public abstract class DataFileGeneratorT
     }
 
     protected BooleanExpression createBooleanExpression(String string) {
-        return new NetworkBridgeFilter(null, new BrokerId(string), 10);
+        return new NetworkBridgeFilter(null, new BrokerId(string), 10, 10);
     }
 
 }

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java?rev=1497716&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java Fri Jun 28 10:32:40 2013
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTestSupport implements UncaughtExceptionHandler {
+    public static final int BROKER_COUNT = 3;
+    public static final int CONSUMER_COUNT = 1;
+    public static final int MESSAGE_COUNT = 0;
+    public static final boolean DUPLEX = false;
+    public static final boolean CONDUIT = true;
+
+    // NETWORK_TTL=4 is problematic for consumer/demand propagation
+    // needs setConsumerTTL=1 to override
+    public static final int NETWORK_TTL = 4;
+    private static final Logger LOG = LoggerFactory.getLogger(VerifyNetworkConsumersDisconnectTest.class);
+    public static final int TIMEOUT = 30000;
+
+    protected Map<String, MessageConsumer> consumerMap;
+    Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>();
+
+    private void assertNoUnhandeledExceptions() {
+        for( Entry<Thread, Throwable> e: unhandeledExceptions.entrySet()) {
+            LOG.error("Thread:" + e.getKey() + " Had unexpected: " + e.getValue());
+        }
+        assertTrue("There are no unhandelled exceptions, see: log for detail on: " + unhandeledExceptions,
+                unhandeledExceptions.isEmpty());
+    }
+
+    public NetworkConnector bridge(String from, String to) throws Exception {
+        NetworkConnector networkConnector = bridgeBrokers(from, to, true, NETWORK_TTL, CONDUIT);
+        networkConnector.setSuppressDuplicateQueueSubscriptions(true);
+        networkConnector.setDecreaseNetworkConsumerPriority(true);
+        networkConnector.setDuplex(DUPLEX);
+        // infinite ttl for messages in a mesh
+        networkConnector.setMessageTTL(-1);
+        // one hop for consumers in a mesh
+        networkConnector.setConsumerTTL(1);
+        return networkConnector;
+    }
+
+    public void testQueueAllConnected() throws Exception {
+        bridge("Broker0", "Broker1");
+        if (!DUPLEX) bridge("Broker1", "Broker0");
+
+        bridge("Broker1", "Broker2");
+        if (!DUPLEX) bridge("Broker2", "Broker1");
+
+        startAllBrokers();
+        this.waitForBridgeFormation();
+
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Setup consumers
+        for (int i = 0; i < BROKER_COUNT; i++) {
+            consumerMap.put("Consumer:" + i + ":0", createConsumer("Broker" + i, dest));
+        }
+
+        assertExactConsumersConnect("Broker0", dest, 2, TIMEOUT);
+        assertExactConsumersConnect("Broker2", dest, 2, TIMEOUT);
+        // piggy in the middle
+        assertExactConsumersConnect("Broker1", dest, 3, TIMEOUT);
+
+        assertNoUnhandeledExceptions();
+
+        LOG.info("Complate the mesh - 0->2");
+
+        // shorter route
+        NetworkConnector nc = bridge("Broker0", "Broker2");
+        nc.setBrokerName("Broker0");
+        nc.start();
+
+
+        if (!DUPLEX) {
+            LOG.info("... complate the mesh - 2->0");
+            nc = bridge("Broker2", "Broker0");
+            nc.setBrokerName("Broker2");
+            nc.start();
+        }
+
+        // wait for consumers to get propagated
+        for (int i = 0; i < BROKER_COUNT; i++) {
+        	assertExactConsumersConnect("Broker" + i, dest, 3, TIMEOUT);
+        }
+
+        // reverse order close
+        consumerMap.get("Consumer:" + 2 + ":0").close();
+        TimeUnit.SECONDS.sleep(1);
+        consumerMap.get("Consumer:" + 1 + ":0").close();
+        TimeUnit.SECONDS.sleep(1);
+        consumerMap.get("Consumer:" + 0 + ":0").close();
+
+        LOG.info("Check for no consumers..");
+        for (int i = 0; i < BROKER_COUNT; i++) {
+        	assertExactConsumersConnect("Broker" + i, dest, 0, TIMEOUT);
+        }
+
+    }
+
+    protected void assertExactConsumersConnect(final String brokerName, Destination destination, final int count, long timeout) throws Exception {
+        final ManagementContext context = brokers.get(brokerName).broker.getManagementContext();
+        assertTrue("Excepected consumers count: " + count + " on: " + brokerName, Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                try {
+                    QueueViewMBean queueViewMBean = (QueueViewMBean) context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
+                    long currentCount = queueViewMBean.getConsumerCount();
+                    LOG.info("On " + brokerName + " current consumer count for " + queueViewMBean + ", " + currentCount);
+                    if (count != currentCount) {
+                        LOG.info("Sub IDs: " + Arrays.asList(queueViewMBean.getSubscriptions()));
+                    }
+                    return currentCount == count;
+                } catch (Exception e) {
+                    LOG.warn(": ", e);
+                    return false;
+                }
+            }
+        }, timeout));
+    }
+
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+
+        unhandeledExceptions.clear();
+        Thread.setDefaultUncaughtExceptionHandler(this);
+        
+        // Setup n brokers
+        for (int i = 0; i < BROKER_COUNT; i++) {
+            createBroker(new URI("broker:(tcp://localhost:6161" + i + ")/Broker" + i + "?persistent=false&useJmx=true&brokerId=Broker" + i));
+        }
+
+        consumerMap = new LinkedHashMap<String, MessageConsumer>();
+    }
+
+    @Override
+    protected void configureBroker(BrokerService brokerService) {
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setExpireMessagesPeriod(0);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(policyEntry);
+        brokerService.setDestinationPolicy(policyMap);
+    }
+
+    public void uncaughtException(Thread t, Throwable e) {
+        synchronized(unhandeledExceptions) {
+            unhandeledExceptions.put(t,e);
+        }
+    }
+}

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message