Author: gtully
Date: Wed Jul 6 16:11:03 2011
New Revision: 1143482
URL: http://svn.apache.org/viewvc?rev=1143482&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3353 - Durable subscribers on durable topics don't
receive messages after network disconnect. Have suppression for durable subs be conditional
on sub active such that reconnection/recreation/failover of bridge is not a problem. org.apache.activemq.usecases.NoDuplicateOnTopicNetworkTest
still needs some work to deal with already exists errors from dupliate restarts.
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkDisconnectTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1143482&r1=1143481&r2=1143482&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Wed Jul 6 16:11:03 2011
@@ -32,8 +32,10 @@ import org.apache.activemq.broker.Broker
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.region.AbstractRegion;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
@@ -1046,7 +1048,7 @@ public abstract class DemandForwardingBr
List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
if (!networkConsumers.isEmpty()) {
if (matchFound(candidateConsumers, networkConsumers)) {
- suppress = hasLowerPriority(sub, candidate.getLocalInfo());
+ suppress = isActiveDurableSub(sub) && hasLowerPriority(sub, candidate.getLocalInfo());
break;
}
}
@@ -1054,6 +1056,10 @@ public abstract class DemandForwardingBr
return suppress;
}
+ private boolean isActiveDurableSub(Subscription sub) {
+ return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription
&& ((DurableTopicSubscription)sub).isActive());
+ }
+
private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo)
{
boolean suppress = false;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java?rev=1143482&r1=1143481&r2=1143482&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
Wed Jul 6 16:11:03 2011
@@ -51,14 +51,14 @@ public class BrokerQueueNetworkWithDisco
private SocketProxy socketProxy;
private long networkDownTimeStart;
public boolean useDuplexNetworkBridge = true;
- public boolean sumulateStalledNetwork;
+ public boolean simulateStalledNetwork;
private long inactiveDuration = 1000;
private boolean useSocketProxy = true;
public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() {
addCombinationValues( "useDuplexNetworkBridge", new Object[]{ Boolean.TRUE, Boolean.FALSE}
);
- addCombinationValues( "sumulateStalledNetwork", new Object[]{ Boolean.TRUE } );
+ addCombinationValues( "simulateStalledNetwork", new Object[]{ Boolean.TRUE } );
}
public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
@@ -197,7 +197,7 @@ public class BrokerQueueNetworkWithDisco
protected void onSend(int i, TextMessage msg) {
sleep(50);
if (i == 50 || i == 150) {
- if (sumulateStalledNetwork) {
+ if (simulateStalledNetwork) {
socketProxy.pause();
} else {
socketProxy.close();
@@ -206,7 +206,7 @@ public class BrokerQueueNetworkWithDisco
} else if (networkDownTimeStart > 0) {
// restart after NETWORK_DOWN_TIME seconds
if (networkDownTimeStart + NETWORK_DOWN_TIME < System.currentTimeMillis())
{
- if (sumulateStalledNetwork) {
+ if (simulateStalledNetwork) {
socketProxy.goOn();
} else {
socketProxy.reopen();
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkDisconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkDisconnectTest.java?rev=1143482&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkDisconnectTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkDisconnectTest.java
Wed Jul 6 16:11:03 2011
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.List;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.SocketProxy;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class DurableSubscriberWithNetworkDisconnectTest extends JmsMultipleBrokersTestSupport
{
+ private static final Log LOG = LogFactory.getLog(DurableSubscriberWithNetworkDisconnectTest.class);
+ private static final int NETWORK_DOWN_TIME = 10000;
+ private static final String HUB = "HubBroker";
+ private static final String SPOKE = "SpokeBroker";
+ private SocketProxy socketProxy;
+ private long networkDownTimeStart;
+ private long inactiveDuration = 1000;
+ private long receivedMsgs = 0;
+ private boolean useSocketProxy = true;
+ protected static final int MESSAGE_COUNT = 200;
+ public boolean useDuplexNetworkBridge = true;
+ public boolean simulateStalledNetwork;
+ public boolean dynamicOnly = true;
+ public long networkTTL = 3;
+ public boolean exponentialBackOff;
+ public boolean failover = false;
+ public boolean inactivity = true;
+
+ public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() {
+ addCombinationValues("failover", new Object[]{Boolean.FALSE, Boolean.TRUE});
+ }
+
+ public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
+ bridgeBrokers(SPOKE, HUB);
+
+ startAllBrokers();
+
+ // Setup connection
+ URI hubURI = brokers.get(HUB).broker.getVmConnectorURI();
+ URI spokeURI = brokers.get(SPOKE).broker.getVmConnectorURI();
+ ActiveMQConnectionFactory facHub = new ActiveMQConnectionFactory(hubURI);
+ ActiveMQConnectionFactory facSpoke = new ActiveMQConnectionFactory(spokeURI);
+ Connection conHub = facHub.createConnection();
+ Connection conSpoke = facSpoke.createConnection();
+ conHub.setClientID("clientHUB");
+ conSpoke.setClientID("clientSPOKE");
+ conHub.start();
+ conSpoke.start();
+ Session sesHub = conHub.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sesSpoke = conSpoke.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ActiveMQTopic topic = new ActiveMQTopic("TEST.FOO");
+ String consumerName = "consumerName";
+
+ // Setup consumers
+ MessageConsumer remoteConsumer = sesSpoke.createDurableSubscriber(topic, consumerName);
+ remoteConsumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message msg) {
+ try {
+ TextMessage textMsg = (TextMessage) msg;
+ receivedMsgs++;
+ LOG.info("Received messages (" + receivedMsgs + "): " + textMsg.getText());
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ // allow subscription information to flow back to Spoke
+ sleep(1000);
+
+ // Setup producer
+ MessageProducer localProducer = sesHub.createProducer(topic);
+ localProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ // Send messages
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ sleep(50);
+ if (i == 50 || i == 150) {
+ if (simulateStalledNetwork) {
+ socketProxy.pause();
+ } else {
+ socketProxy.close();
+ }
+ networkDownTimeStart = System.currentTimeMillis();
+ } else if (networkDownTimeStart > 0) {
+ // restart after NETWORK_DOWN_TIME seconds
+ sleep(NETWORK_DOWN_TIME);
+ networkDownTimeStart = 0;
+ if (simulateStalledNetwork) {
+ socketProxy.goOn();
+ } else {
+ socketProxy.reopen();
+ }
+ } else {
+ // slow message production to allow bridge to recover and limit message duplication
+ sleep(500);
+ }
+ Message test = sesHub.createTextMessage("test-" + i);
+ localProducer.send(test);
+ }
+
+ LOG.info("waiting for messages to flow");
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return receivedMsgs >= MESSAGE_COUNT;
+ }
+ });
+
+ assertTrue("At least message " + MESSAGE_COUNT +
+ " must be received, count=" + receivedMsgs,
+ MESSAGE_COUNT <= receivedMsgs);
+ brokers.get(HUB).broker.deleteAllMessages();
+ brokers.get(SPOKE).broker.deleteAllMessages();
+ conHub.close();
+ conSpoke.close();
+ }
+
+ @Override
+ protected void startAllBrokers() throws Exception {
+ // Ensure HUB is started first so bridge will be active from the get go
+ BrokerItem brokerItem = brokers.get(HUB);
+ brokerItem.broker.start();
+ brokerItem = brokers.get(SPOKE);
+ brokerItem.broker.start();
+ sleep(600);
+ }
+
+ public void setUp() throws Exception {
+ networkDownTimeStart = 0;
+ inactiveDuration = 1000;
+ useSocketProxy = true;
+ receivedMsgs = 0;
+ super.setAutoFail(true);
+ super.setUp();
+ final String options = "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true";
+ createBroker(new URI("broker:(tcp://localhost:61617)/" + HUB + options));
+ createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options));
+ }
+
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if (socketProxy != null) {
+ socketProxy.close();
+ }
+ }
+
+ public static Test suite() {
+ return suite(DurableSubscriberWithNetworkDisconnectTest.class);
+ }
+
+ private void sleep(int milliSecondTime) {
+ try {
+ Thread.sleep(milliSecondTime);
+ } catch (InterruptedException igonred) {
+ }
+ }
+
+ @Override
+ protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker,
boolean l_dynamicOnly, int networkTTL, boolean l_conduit, boolean l_failover) throws Exception
{
+ List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
+ URI remoteURI;
+ if (!transportConnectors.isEmpty()) {
+ remoteURI = ((TransportConnector) transportConnectors.get(0)).getConnectUri();
+ if (useSocketProxy) {
+ socketProxy = new SocketProxy(remoteURI);
+ remoteURI = socketProxy.getUrl();
+ }
+ String options = "";
+ if (failover) {
+ options = "static:(failover:(" + remoteURI;
+ } else {
+ options = "static:(" + remoteURI;
+ }
+ if (inactivity) {
+ options += "?wireFormat.maxInactivityDuration=" + inactiveDuration + "&wireFormat.maxInactivityDurationInitalDelay="
+ inactiveDuration + ")";
+ } else {
+ options += ")";
+ }
+
+ if (failover) {
+ options += "?maxReconnectAttempts=1)";
+ }
+
+ options += "?useExponentialBackOff=" + exponentialBackOff;
+ DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI(options));
+ connector.setDynamicOnly(dynamicOnly);
+ connector.setNetworkTTL(networkTTL);
+ localBroker.addNetworkConnector(connector);
+ maxSetupTime = 2000;
+ if (useDuplexNetworkBridge) {
+ connector.setDuplex(true);
+ }
+ return connector;
+ } else {
+ throw new Exception("Remote broker has no registered connectors.");
+ }
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkDisconnectTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkDisconnectTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java?rev=1143482&r1=1143481&r2=1143482&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
Wed Jul 6 16:11:03 2011
@@ -38,7 +38,6 @@ public class MulticastDiscoveryOnFaultyN
private static final String HUB = "HubBroker";
private static final String SPOKE = "SpokeBroker";
public boolean useDuplexNetworkBridge;
- public boolean sumulateStalledNetwork;
private TransportConnector mCastTrpConnector;
@@ -121,9 +120,9 @@ public class MulticastDiscoveryOnFaultyN
List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
if (!transportConnectors.isEmpty()) {
- mCastTrpConnector = ((TransportConnector)transportConnectors.get(0));
- mCastTrpConnector.setDiscoveryUri(new URI("multicast://default?group=TESTERIC"));
- }
- return connector;
+ mCastTrpConnector = ((TransportConnector)transportConnectors.get(0));
+ mCastTrpConnector.setDiscoveryUri(new URI("multicast://default?group=TESTERIC"));
+ }
+ return connector;
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java?rev=1143482&r1=1143481&r2=1143482&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java
Wed Jul 6 16:11:03 2011
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -66,10 +67,12 @@ public class NoDuplicateOnTopicNetworkTe
public boolean suppressDuplicateTopicSubs = false;
public DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
+ public boolean durableSub = false;
+ AtomicInteger idCounter = new AtomicInteger(0);
private boolean dynamicOnly = false;
// no duplicates in cyclic network if networkTTL <=1
- // when > 1, subscriptions perculate around resulting in duplicates as there is no
+ // when > 1, subscriptions percolate around resulting in duplicates as there is no
// memory of the original subscription.
// solution for 6.0 using org.apache.activemq.command.ConsumerInfo.getNetworkConsumerIds()
private int ttl = 3;
@@ -114,6 +117,7 @@ public class NoDuplicateOnTopicNetworkTe
private BrokerService createAndStartBroker(String name, String addr)
throws Exception {
BrokerService broker = new BrokerService();
+ //broker.setDeleteAllMessagesOnStartup(true);
broker.setBrokerName(name);
broker.addConnector(addr).setDiscoveryUri(new URI(MULTICAST_DEFAULT));
broker.setUseJmx(false);
@@ -148,8 +152,9 @@ public class NoDuplicateOnTopicNetworkTe
}
public void initCombosForTestProducerConsumerTopic() {
- this.addCombinationValues("suppresDuplicateTopicSubs", new Object[]{Boolean.TRUE,
Boolean.FALSE});
+ this.addCombinationValues("suppressDuplicateTopicSubs", new Object[]{Boolean.TRUE,
Boolean.FALSE});
this.addCombinationValues("dispatchPolicy", new Object[]{new PriorityNetworkDispatchPolicy(),
new SimpleDispatchPolicy()});
+ this.addCombinationValues("durableSub", new Object[]{Boolean.TRUE, Boolean.FALSE});
}
public void testProducerConsumerTopic() throws Exception {
@@ -206,6 +211,7 @@ public class NoDuplicateOnTopicNetworkTe
}
map.put(msg, msg);
}
+ consumer.unSubscribe();
if (suppressDuplicateTopicSubs || dispatchPolicy instanceof PriorityNetworkDispatchPolicy)
{
assertEquals("no duplicates", 0, duplicateCount);
assertEquals("got all required messages: " + map.size(), consumer
@@ -227,6 +233,7 @@ public class NoDuplicateOnTopicNetworkTe
private Topic topic;
private MessageProducer producer;
private MessageConsumer consumer;
+ private final String durableID = "DURABLE_ID";
private List<String> receivedStrings = Collections.synchronizedList(new ArrayList<String>());
private int numMessages = 10;
@@ -262,6 +269,7 @@ public class NoDuplicateOnTopicNetworkTe
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
brokerURL);
connection = factory.createConnection();
+ connection.setClientID("ID" + idCounter.incrementAndGet());
}
private void createTopic() throws JMSException {
@@ -274,7 +282,11 @@ public class NoDuplicateOnTopicNetworkTe
}
private void createConsumer() throws JMSException {
- consumer = session.createConsumer(topic);
+ if (durableSub) {
+ consumer = session.createDurableSubscriber(topic, durableID);
+ } else {
+ consumer = session.createConsumer(topic);
+ }
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) {
@@ -319,5 +331,14 @@ public class NoDuplicateOnTopicNetworkTe
public int getNumMessages() {
return numMessages;
}
+
+ public void unSubscribe() throws Exception {
+ consumer.close();
+ if (durableSub) {
+ session.unsubscribe(durableID);
+ // ensure un-subscription has percolated though the network
+ Thread.sleep(2000);
+ }
+ }
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java?rev=1143482&r1=1143481&r2=1143482&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java Wed
Jul 6 16:11:03 2011
@@ -134,7 +134,7 @@ public class SocketProxy {
synchronized(this.connections) {
connections = new ArrayList<Bridge>(this.connections);
}
- LOG.info("close, numConnectons=" + connections.size());
+ LOG.info("close, numConnections=" + connections.size());
for (Bridge con : connections) {
closeConnection(con);
}
@@ -151,7 +151,7 @@ public class SocketProxy {
synchronized(this.connections) {
connections = new ArrayList<Bridge>(this.connections);
}
- LOG.info("halfClose, numConnectons=" + connections.size());
+ LOG.info("halfClose, numConnections=" + connections.size());
for (Bridge con : connections) {
halfCloseConnection(con);
}
@@ -174,12 +174,12 @@ public class SocketProxy {
}
/*
- * pause accepting new connecitons and data transfer through existing proxy
+ * pause accepting new connections and data transfer through existing proxy
* connections. All sockets remain open
*/
public void pause() {
synchronized(connections) {
- LOG.info("pause, numConnectons=" + connections.size());
+ LOG.info("pause, numConnections=" + connections.size());
acceptor.pause();
for (Bridge con : connections) {
con.pause();
@@ -192,7 +192,7 @@ public class SocketProxy {
*/
public void goOn() {
synchronized(connections) {
- LOG.info("goOn, numConnectons=" + connections.size());
+ LOG.info("goOn, numConnections=" + connections.size());
for (Bridge con : connections) {
con.goOn();
}
|