Author: gtully
Date: Wed Dec 9 21:38:23 2009
New Revision: 888988
URL: http://svn.apache.org/viewvc?rev=888988&view=rev
Log:
svn merge -c 888974 - resolve https://issues.apache.org/activemq/browse/AMQ-2527 - add timeout
to waitForSlave and make the following more reseliant to slow machines, VMTransportWaitForTest,MasterSlaveSlaveDieTest,SimpleNetworkTest,NetworkBrokerDetachTest,DuplexNetworkMBeanTest,MultiBrokersMultiClientsTest,AMQ2102Test
- related to changes for https://issues.apache.org/activemq/browse/AMQ-1112
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveDieTest.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=888988&r1=888987&r2=888988&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Wed Dec 9 21:38:23 2009
@@ -116,6 +116,7 @@
private boolean shutdownOnMasterFailure;
private boolean shutdownOnSlaveFailure;
private boolean waitForSlave;
+ private long waitForSlaveTimeout = 600000L;
private boolean passiveSlave;
private String brokerName = DEFAULT_BROKER_NAME;
private File dataDirectoryFile;
@@ -1908,7 +1909,9 @@
protected void waitForSlave() {
try {
- slaveStartSignal.await();
+ if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) {
+ throw new IllegalStateException("Gave up waiting for slave to start after "
+ waitForSlaveTimeout + " milliseconds.");
+ }
} catch (InterruptedException e) {
LOG.error("Exception waiting for slave:" + e);
}
@@ -2105,7 +2108,15 @@
public void setWaitForSlave(boolean waitForSlave) {
this.waitForSlave = waitForSlave;
}
-
+
+ public long getWaitForSlaveTimeout() {
+ return this.waitForSlaveTimeout;
+ }
+
+ public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
+ this.waitForSlaveTimeout = waitForSlaveTimeout;
+ }
+
public CountDownLatch getSlaveStartSignal() {
return slaveStartSignal;
}
@@ -2132,4 +2143,4 @@
}
-}
\ No newline at end of file
+}
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=888988&r1=888987&r2=888988&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Wed Dec 9 21:38:23 2009
@@ -700,7 +700,6 @@
try{
if(node!=null){
Message message=node.getMessage();
- stampAsExpired(message);
if(message!=null && node.getRegionDestination()!=null){
DeadLetterStrategy deadLetterStrategy=node
.getRegionDestination().getDeadLetterStrategy();
@@ -708,6 +707,7 @@
if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
// message may be inflight to other subscriptions so do not modify
message = message.copy();
+ stampAsExpired(message);
message.setExpiration(0);
if(!message.isPersistent()){
message.setPersistent(true);
Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveDieTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveDieTest.java?rev=888988&r1=888987&r2=888988&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveDieTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/ft/MasterSlaveSlaveDieTest.java
Wed Dec 9 21:38:23 2009
@@ -54,7 +54,10 @@
final BrokerService master = new BrokerService();
master.setBrokerName("master");
master.setPersistent(false);
- master.addConnector("tcp://localhost:0");
+ // The wireformat negotiation timeout (defaults to same as
+ // MaxInactivityDurationInitalDelay) needs to be a bit longer
+ // on slow running machines - set it to 90 seconds.
+ master.addConnector("tcp://localhost:0?wireFormat.maxInactivityDurationInitalDelay=90000");
master.setWaitForSlave(true);
master.setPlugins(new BrokerPlugin[] { new Plugin() });
@@ -73,6 +76,7 @@
try {
master.start();
} catch (Exception e) {
+ LOG.warn("Exception starting master: " + e);
e.printStackTrace();
}
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java?rev=888988&r1=888987&r2=888988&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
Wed Dec 9 21:38:23 2009
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.network;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeNotNull;
+
import java.net.MalformedURLException;
import java.util.Set;
@@ -26,13 +29,13 @@
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
-import junit.framework.TestCase;
-
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-public class DuplexNetworkMBeanTest extends TestCase {
+import org.junit.Test;
+
+public class DuplexNetworkMBeanTest {
protected static final Log LOG = LogFactory.getLog(DuplexNetworkMBeanTest.class);
protected final int numRestarts = 3;
@@ -54,10 +57,11 @@
return broker;
}
+ @Test
public void testMbeanPresenceOnNetworkBrokerRestart() throws Exception {
BrokerService broker = createBroker();
broker.start();
- assertEquals(1, countMbeans(broker, "Connector", 10000));
+ assertEquals(1, countMbeans(broker, "Connector", 30000));
assertEquals(0, countMbeans(broker, "Connection"));
BrokerService networkedBroker = null;
for (int i=0; i<numRestarts; i++) {
@@ -78,11 +82,12 @@
broker.waitUntilStopped();
}
+ @Test
public void testMbeanPresenceOnBrokerRestart() throws Exception {
BrokerService networkedBroker = createNetworkedBroker();
networkedBroker.start();
- assertEquals(1, countMbeans(networkedBroker, "Connector", 10000));
+ assertEquals(1, countMbeans(networkedBroker, "Connector", 30000));
assertEquals(0, countMbeans(networkedBroker, "Connection"));
BrokerService broker = null;
@@ -129,6 +134,14 @@
}
}
} while ((mbeans == null || mbeans.isEmpty()) && expiryTime > System.currentTimeMillis());
+
+ // If port 1099 is in use when the Broker starts, starting the jmx
+ // connector will fail. So, if we have no mbsc to query, skip the
+ // test.
+ if (timeout > 0) {
+ assumeNotNull(mbeans);
+ }
+
return count;
}
@@ -147,6 +160,7 @@
LOG.info(bean.getObjectName());
}
} catch (Exception ignored) {
+ LOG.warn("getMBeanServer ex: " + ignored);
}
return mbsc;
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java?rev=888988&r1=888987&r2=888988&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
Wed Dec 9 21:38:23 2009
@@ -16,28 +16,31 @@
*/
package org.apache.activemq.network;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeNotNull;
+
import java.net.MalformedURLException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
-import javax.jms.MessageConsumer;
import javax.jms.Session;
+import javax.management.InstanceNotFoundException;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
-import junit.framework.TestCase;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
-public class NetworkBrokerDetachTest extends TestCase {
+public class NetworkBrokerDetachTest {
private final static String BROKER_NAME = "broker";
private final static String REM_BROKER_NAME = "networkedBroker";
@@ -63,6 +66,7 @@
return broker;
}
+ @Test
public void testNetworkedBrokerDetach() throws Exception {
BrokerService broker = createBroker();
broker.start();
@@ -77,29 +81,52 @@
Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
for(int i=0; i<NUM_CONSUMERS; i++) {
- MessageConsumer consumer = consSession.createConsumer(consSession.createQueue(QUEUE_NAME));
+ consSession.createConsumer(consSession.createQueue(QUEUE_NAME));
}
-
- Thread.sleep(5000);
-
- MBeanServerConnection mbsc = getMBeanServerConnection();
- // We should have 1 consumer for the queue on the local broker
- Object consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME, "ConsumerCount");
- LOG.info("Consumers for " + QUEUE_NAME + " on " + BROKER_NAME + " : " + consumers);
- assertEquals(1L, ((Long)consumers).longValue());
+ assertTrue("got expected consumer count from mbean within time limit", Wait.waitFor(new
Wait.Condition() {
+
+ public boolean isSatisified() throws Exception {
+ boolean result = false;
+ MBeanServerConnection mbsc = getMBeanServerConnection();
+ if (mbsc != null) {
+ // We should have 1 consumer for the queue on the local broker
+ Object consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME,
"ConsumerCount");
+ if (consumers != null) {
+ LOG.info("Consumers for " + QUEUE_NAME + " on " + BROKER_NAME + "
: " + consumers);
+ if (1L == ((Long)consumers).longValue()) {
+ result = true;
+ }
+ }
+ }
+ return result;
+ }
+ }));
LOG.info("Stopping Consumer on the networked broker ...");
// Closing the connection will also close the consumer
consConn.close();
- Thread.sleep(5000);
-
// We should have 0 consumer for the queue on the local broker
- consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME, "ConsumerCount");
- LOG.info("Consumers for " + QUEUE_NAME + " on " + BROKER_NAME + " : " + consumers);
- assertEquals(0L, ((Long)consumers).longValue());
+ assertTrue("got expected 0 count from mbean within time limit", Wait.waitFor(new
Wait.Condition() {
+
+ public boolean isSatisified() throws Exception {
+ boolean result = false;
+ MBeanServerConnection mbsc = getMBeanServerConnection();
+ if (mbsc != null) {
+ // We should have 1 consumer for the queue on the local broker
+ Object consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME,
"ConsumerCount");
+ if (consumers != null) {
+ LOG.info("Consumers for " + QUEUE_NAME + " on " + BROKER_NAME + "
: " + consumers);
+ if (0L == ((Long)consumers).longValue()) {
+ result = true;
+ }
+ }
+ }
+ return result;
+ }
+ }));
networkedBroker.stop();
networkedBroker.waitUntilStopped();
@@ -134,21 +161,23 @@
try {
JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
mbsc = jmxc.getMBeanServerConnection();
-
-// // trace all existing MBeans
-// Set<?> all = mbsc.queryMBeans(null, null);
-// LOG.info("Total MBean count=" + all.size());
-// for (Object o : all) {
-// ObjectInstance bean = (ObjectInstance)o;
-// LOG.info(bean.getObjectName());
-// }
} catch (Exception ignored) {
+ LOG.warn("getMBeanServer ex: " + ignored);
}
+ // If port 1099 is in use when the Broker starts, starting the jmx
+ // connector will fail. So, if we have no mbsc to query, skip the
+ // test.
+ assumeNotNull(mbsc);
return mbsc;
}
private Object getAttribute(MBeanServerConnection mbsc, String type, String pattern,
String attrName) throws Exception {
- Object obj = mbsc.getAttribute(getObjectName(BROKER_NAME, type, pattern), attrName);
+ Object obj = null;
+ try {
+ obj = mbsc.getAttribute(getObjectName(BROKER_NAME, type, pattern), attrName);
+ } catch (InstanceNotFoundException ignored) {
+ LOG.warn("getAttribute ex: " + ignored);
+ }
return obj;
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java?rev=888988&r1=888987&r2=888988&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
Wed Dec 9 21:38:23 2009
@@ -81,7 +81,7 @@
TopicRequestor requestor = new TopicRequestor((TopicSession)localSession, included);
// allow for consumer infos to perculate arround
- Thread.sleep(2000);
+ Thread.sleep(5000);
for (int i = 0; i < MESSAGE_COUNT; i++) {
TextMessage msg = localSession.createTextMessage("test msg: " + i);
TextMessage result = (TextMessage)requestor.request(msg);
@@ -110,16 +110,16 @@
MessageConsumer consumer2 = remoteSession.createConsumer(included);
MessageProducer producer = localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- Thread.sleep(1000);
+ Thread.sleep(2000);
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message test = localSession.createTextMessage("test-" + i);
producer.send(test);
- assertNotNull(consumer1.receive(500));
- assertNotNull(consumer2.receive(500));
+ assertNotNull(consumer1.receive(1000));
+ assertNotNull(consumer2.receive(1000));
}
// ensure no more messages received
- assertNull(consumer1.receive(500));
- assertNull(consumer2.receive(500));
+ assertNull(consumer1.receive(1000));
+ assertNull(consumer2.receive(1000));
}
public void testDurableStoreAndForward() throws Exception {
Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java?rev=888988&r1=888987&r2=888988&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
Wed Dec 9 21:38:23 2009
@@ -57,7 +57,7 @@
} catch (Exception e) {
e.printStackTrace();
- fail("unexpected exception:" + e);
+ fail("unexpected exception: " + e);
}
}
};
@@ -70,7 +70,7 @@
broker.setPersistent(false);
broker.addConnector("tcp://localhost:61616");
broker.start();
- assertTrue("has got connection", gotConnection.await(200, TimeUnit.MILLISECONDS));
+ assertTrue("has got connection", gotConnection.await(400, TimeUnit.MILLISECONDS));
broker.stop();
}
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java?rev=888988&r1=888987&r2=888988&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java
Wed Dec 9 21:38:23 2009
@@ -65,7 +65,7 @@
// wait for consumers to get propagated
for (int i = 1; i <= BROKER_COUNT; i++) {
// all consumers on the remote brokers look like 1 consumer to the local broker.
- assertConsumersConnect("Broker" + i, dest, (BROKER_COUNT-1)+CONSUMER_COUNT, 30000);
+ assertConsumersConnect("Broker" + i, dest, (BROKER_COUNT-1)+CONSUMER_COUNT, 65000);
}
// Send messages
@@ -115,7 +115,7 @@
// wait for consumers to get propagated
for (int i = 1; i <= BROKER_COUNT; i++) {
// all consumers on the remote brokers look like 1 consumer to the local broker.
- assertConsumersConnect("Broker" + i, dest, (BROKER_COUNT-1)+CONSUMER_COUNT, 30000);
+ assertConsumersConnect("Broker" + i, dest, (BROKER_COUNT-1)+CONSUMER_COUNT, 65000);
}
// Send messages
|