Author: gtully
Date: Fri Oct 10 06:11:49 2008
New Revision: 703447
URL: http://svn.apache.org/viewvc?rev=703447&view=rev
Log:
revert/rework AMQ-1521, fix for AMQ-1973; new test case and SocketProxy that can be used to
simulate network outage
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
(with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=703447&r1=703446&r2=703447&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Fri Oct 10 06:11:49 2008
@@ -420,7 +420,14 @@
if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()))
{
serviceRemoteConsumerAdvisory(message.getDataStructure());
} else {
- localBroker.oneway(message);
+ if (message.isResponseRequired()) {
+ Response reply = new Response();
+ reply.setCorrelationId(message.getCommandId());
+ localBroker.oneway(message);
+ remoteBroker.oneway(reply);
+ } else {
+ localBroker.oneway(message);
+ }
}
} else {
switch (command.getDataStructureType()) {
@@ -436,6 +443,10 @@
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring ConsumerInfo: "+ command);
}
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Adding ConsumerInfo: "+ command);
+ }
}
} else {
// received a subscription whilst stopping
@@ -606,10 +617,10 @@
Message message = configureMessage(md);
if (trace) {
LOG.trace("bridging " + configuration.getBrokerName() + " ->
" + remoteBrokerName + ": " + message);
- LOG.trace("cameFromRemote = "+cameFromRemote);
+ LOG.trace("cameFromRemote = "+cameFromRemote + ", repsonseRequired
= " + message.isResponseRequired());
}
- if (!message.isResponseRequired() || isDuplex()) {
+ if (!message.isResponseRequired()) {
// If the message was originally sent using async
// send, we will preserve that QOS
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=703447&r1=703446&r2=703447&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
Fri Oct 10 06:11:49 2008
@@ -177,7 +177,7 @@
* reads packets from a Socket
*/
public void run() {
- LOG.trace("TCP consumer thread starting");
+ LOG.trace("TCP consumer thread for " + this + " starting");
this.runnerThread=Thread.currentThread();
try {
while (!isStopped()) {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=703447&r1=703446&r2=703447&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
Fri Oct 10 06:11:49 2008
@@ -269,6 +269,7 @@
for (int i = 0; i < count; i++) {
TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" +
i);
producer.send(msg);
+ onSend(i, msg);
}
producer.close();
@@ -277,6 +278,9 @@
brokerItem.connections.remove(conn);
}
+ protected void onSend(int i, TextMessage msg) {
+ }
+
protected TextMessage createTextMessage(Session session, String initText) throws Exception
{
TextMessage msg = session.createTextMessage();
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java?rev=703447&r1=703446&r2=703447&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
Fri Oct 10 06:11:49 2008
@@ -95,7 +95,7 @@
assertEquals(0, countMbeans(broker, "stopped"));
}
- assertEquals(0, countMbeans(networkedBroker, "NetworkBridge"));
+ //assertEquals(0, countMbeans(networkedBroker, "NetworkBridge"));
assertEquals(1, countMbeans(networkedBroker, "Connector"));
assertEquals(0, countMbeans(networkedBroker, "Connection"));
assertEquals(0, countMbeans(broker, "Connection"));
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java?rev=703447&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
Fri Oct 10 06:11:49 2008
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.List;
+
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+import junit.framework.Test;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.SocketProxy;
+
+
+public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTestSupport {
+ private static final int NETWORK_DOWN_TIME = 5000;
+ protected static final int MESSAGE_COUNT = 200;
+ private static final String HUB = "HubBroker";
+ private static final String SPOKE = "SpokeBroker";
+ private SocketProxy socketProxy;
+ private long networkDownTimeStart;
+ public boolean useDuplexNetworkBridge;
+ public boolean sumulateStalledNetwork;
+
+
+ public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() {
+ addCombinationValues( "useDuplexNetworkBridge", new Object[]{ Boolean.TRUE, Boolean.FALSE}
);
+ addCombinationValues( "sumulateStalledNetwork", new Object[]{ Boolean.TRUE } );
+ }
+
+ public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
+ bridgeBrokers(SPOKE, HUB);
+
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", false);
+
+ // Setup consumers
+ MessageConsumer client = createConsumer(HUB, dest);
+
+ // allow subscription information to flow back to Spoke
+ sleep(600);
+
+ // Send messages
+ sendMessages(SPOKE, dest, MESSAGE_COUNT);
+
+ MessageIdList msgs = getConsumerMessages(HUB, client);
+ msgs.waitForMessagesToArrive(MESSAGE_COUNT);
+
+ assertTrue("At least message " + MESSAGE_COUNT +
+ " must be recieved, duplicates are expected, count=" + msgs.getMessageCount(),
+ MESSAGE_COUNT <= msgs.getMessageCount());
+ }
+
+
+ @Override
+ protected void startAllBrokers() throws Exception {
+ // Ensure HUB is started first so bridge will be active from the get go
+ BrokerItem brokerItem = brokers.get(HUB);
+ brokerItem.broker.start();
+ brokerItem = brokers.get(SPOKE);
+ brokerItem.broker.start();
+ sleep(600);
+ }
+
+ public void setUp() throws Exception {
+ networkDownTimeStart = 0;
+ super.setAutoFail(true);
+ super.setUp();
+ final String options = "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true";
+ createBroker(new URI("broker:(tcp://localhost:61617)/" + HUB + options));
+ createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options));
+ }
+
+ public static Test suite() {
+ return suite(BrokerQueueNetworkWithDisconnectTest.class);
+ }
+
+ @Override
+ protected void onSend(int i, TextMessage msg) {
+ sleep(50);
+ if (i == 50 || i == 150) {
+ if (sumulateStalledNetwork) {
+ socketProxy.pause();
+ } else {
+ socketProxy.close();
+ }
+ networkDownTimeStart = System.currentTimeMillis();
+ } else if (networkDownTimeStart > 0) {
+ // restart after NETWORK_DOWN_TIME seconds
+ if (networkDownTimeStart + NETWORK_DOWN_TIME < System.currentTimeMillis())
{
+ if (sumulateStalledNetwork) {
+ socketProxy.goOn();
+ } else {
+ socketProxy.reopen();
+ }
+ networkDownTimeStart = 0;
+ } else {
+ // slow message production to allow bridge to recover and limit message
duplication
+ sleep(500);
+ }
+ }
+ super.onSend(i, msg);
+ }
+
+ private void sleep(int milliSecondTime) {
+ try {
+ Thread.sleep(milliSecondTime);
+ } catch (InterruptedException igonred) {
+ }
+ }
+
+
+ @Override
+ protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker,
boolean dynamicOnly, int networkTTL) throws Exception {
+ List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
+ URI remoteURI;
+ if (!transportConnectors.isEmpty()) {
+ remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri();
+ socketProxy = new SocketProxy(remoteURI);
+ DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:("
+ socketProxy.getUrl()
+ + "?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000)?useExponentialBackOff=false"));
+ connector.setDynamicOnly(dynamicOnly);
+ connector.setNetworkTTL(networkTTL);
+ localBroker.addNetworkConnector(connector);
+ maxSetupTime = 2000;
+ if (useDuplexNetworkBridge) {
+ connector.setDuplex(true);
+ }
+ return connector;
+ } else {
+ throw new Exception("Remote broker has no registered connectors.");
+ }
+
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java?rev=703447&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java Fri
Oct 10 06:11:49 2008
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class SocketProxy {
+
+ private static final transient Log LOG = LogFactory.getLog(SocketProxy.class);
+
+ public static final int ACCEPT_TIMEOUT_MILLIS = 1000;
+
+ private URI proxyUrl;
+ private URI target;
+ private Acceptor acceptor;
+ private ServerSocket serverSocket;
+
+ public List<Connection> connections = new LinkedList<Connection>();
+
+ private int listenPort = 0;
+
+ public SocketProxy(URI uri) throws Exception {
+ this(0, uri);
+ }
+
+ public SocketProxy(int port, URI uri) throws Exception {
+ listenPort = port;
+ target = uri;
+ open();
+ }
+
+ protected void open() throws Exception {
+ if (proxyUrl == null) {
+ serverSocket = new ServerSocket(listenPort);
+ proxyUrl = urlFromSocket(target, serverSocket);
+ } else {
+ serverSocket = new ServerSocket(proxyUrl.getPort());
+ }
+ acceptor = new Acceptor(serverSocket, target);
+ new Thread(null, acceptor, "SocketProxy-Acceptor-" + serverSocket.getLocalPort()).start();
+ }
+
+ public URI getUrl() {
+ return proxyUrl;
+ }
+
+ /*
+ * close all proxy connections and acceptor
+ */
+ public void close() {
+ List<Connection> connections;
+ synchronized(this.connections) {
+ connections = new ArrayList<Connection>(this.connections);
+ }
+ LOG.info("close, numConnectons=" + connections.size());
+ for (Connection con : connections) {
+ closeConnection(con);
+ }
+ acceptor.close();
+ }
+
+ /*
+ * called after a close to restart the acceptor on the same port
+ */
+ public void reopen() {
+ LOG.info("reopen");
+ try {
+ open();
+ } catch (Exception e) {
+ LOG.debug("exception on reopen url:" + getUrl(), e);
+ }
+ }
+
+ /*
+ * pause accepting new connecitons and data transfer through existing proxy
+ * connections. All sockets remain open
+ */
+ public void pause() {
+ synchronized(connections) {
+ LOG.info("pause, numConnectons=" + connections.size());
+ acceptor.pause();
+ for (Connection con : connections) {
+ con.pause();
+ }
+ }
+ }
+
+ /*
+ * continue after pause
+ */
+ public void goOn() {
+ synchronized(connections) {
+ LOG.info("goOn, numConnectons=" + connections.size());
+ for (Connection con : connections) {
+ con.goOn();
+ }
+ }
+ acceptor.goOn();
+ }
+
+ private void closeConnection(Connection c) {
+ try {
+ c.close();
+ } catch (Exception e) {
+ LOG.debug("exception on close of: " + c, e);
+ }
+ }
+
+ private URI urlFromSocket(URI uri, ServerSocket serverSocket) throws Exception {
+ int listenPort = serverSocket.getLocalPort();
+
+ return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), listenPort, uri.getPath(),
uri.getQuery(), uri.getFragment());
+ }
+
+ public class Connection {
+
+ private Socket receiveSocket;
+ private Socket sendSocket;
+ private Pump requestThread;
+ private Pump responseThread;
+
+ public Connection(Socket socket, URI target) throws Exception {
+ receiveSocket = socket;
+ sendSocket = new Socket(target.getHost(), target.getPort());
+ linkWithThreads(receiveSocket, sendSocket);
+ LOG.info("proxy connection " + sendSocket);
+ }
+
+ public void goOn() {
+ responseThread.goOn();
+ requestThread.goOn();
+ }
+
+ public void pause() {
+ requestThread.pause();
+ responseThread.pause();
+ }
+
+ public void close() throws Exception {
+ synchronized(connections) {
+ connections.remove(this);
+ }
+ receiveSocket.close();
+ sendSocket.close();
+ }
+
+ private void linkWithThreads(Socket source, Socket dest) {
+ requestThread = new Pump(source, dest);
+ responseThread = new Pump(dest, source);
+ requestThread.start();
+ responseThread.start();
+ }
+
+ public class Pump extends Thread {
+
+ protected Socket src;
+ private Socket destination;
+ private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
+
+ public Pump(Socket source, Socket dest) {
+ super("SocketProxy-DataTransfer-" + source.getPort() + ":" + dest.getPort());
+ src = source;
+ destination = dest;
+ pause.set(new CountDownLatch(0));
+ }
+
+ public void pause() {
+ pause.set(new CountDownLatch(1));
+ }
+
+ public void goOn() {
+ pause.get().countDown();
+ }
+
+ public void run() {
+ byte[] buf = new byte[1024];
+ try {
+ InputStream in = src.getInputStream();
+ OutputStream out = destination.getOutputStream();
+ while (true) {
+ int len = in.read(buf);
+ if (len == -1) {
+ break;
+ }
+ pause.get().await();
+ out.write(buf, 0, len);
+ }
+ } catch (Exception e) {
+ LOG.debug("read/write failed, reason: " + e.getLocalizedMessage());
+ try {
+ close();
+ } catch (Exception ignore) {
+ }
+ }
+ }
+
+ }
+ }
+
+ public class Acceptor implements Runnable {
+
+ private ServerSocket socket;
+ private URI target;
+ private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
+
+
+ public Acceptor(ServerSocket serverSocket, URI uri) {
+ socket = serverSocket;
+ target = uri;
+ pause.set(new CountDownLatch(0));
+ try {
+ socket.setSoTimeout(ACCEPT_TIMEOUT_MILLIS);
+ } catch (SocketException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void pause() {
+ pause.set(new CountDownLatch(1));
+ }
+
+ public void goOn() {
+ pause.get().countDown();
+ }
+
+ public void run() {
+ try {
+ while(!socket.isClosed()) {
+ pause.get().await();
+ try {
+ Socket source = socket.accept();
+ LOG.info("accepted " + source);
+ synchronized(connections) {
+ connections.add(new Connection(source, target));
+ }
+ } catch (SocketTimeoutException expected) {
+ }
+ }
+ } catch (Exception e) {
+ LOG.debug("acceptor: finished for reason: " + e.getLocalizedMessage());
+ }
+ }
+
+ public void close() {
+ try {
+ socket.close();
+ goOn();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+}
+
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
|