Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 38431 invoked from network); 10 Oct 2008 13:12:21 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 10 Oct 2008 13:12:21 -0000 Received: (qmail 5438 invoked by uid 500); 10 Oct 2008 13:12:20 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 5415 invoked by uid 500); 10 Oct 2008 13:12:20 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 5406 invoked by uid 99); 10 Oct 2008 13:12:20 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Oct 2008 06:12:20 -0700 X-ASF-Spam-Status: No, hits=-1999.9 required=10.0 tests=ALL_TRUSTED,DNS_FROM_SECURITYSAGE X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Oct 2008 13:11:15 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 01C5B23888AF; Fri, 10 Oct 2008 06:11:52 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r703447 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ main/java/org/apache/activemq/transport/tcp/ test/java/org/apache/activemq/ test/java/org/apache/activemq/network/ test/java/org/apache/activemq/usecases... Date: Fri, 10 Oct 2008 13:11:50 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081010131152.01C5B23888AF@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Fri Oct 10 06:11:49 2008 New Revision: 703447 URL: http://svn.apache.org/viewvc?rev=703447&view=rev Log: revert/rework AMQ-1521, fix for AMQ-1973; new test case and SocketProxy that can be used to simulate network outage Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java (with props) activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=703447&r1=703446&r2=703447&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Oct 10 06:11:49 2008 @@ -420,7 +420,14 @@ if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())) { serviceRemoteConsumerAdvisory(message.getDataStructure()); } else { - localBroker.oneway(message); + if (message.isResponseRequired()) { + Response reply = new Response(); + reply.setCorrelationId(message.getCommandId()); + localBroker.oneway(message); + remoteBroker.oneway(reply); + } else { + localBroker.oneway(message); + } } } else { switch (command.getDataStructureType()) { @@ -436,6 +443,10 @@ if (LOG.isDebugEnabled()) { LOG.debug("Ignoring ConsumerInfo: "+ command); } + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Adding ConsumerInfo: "+ command); + } } } else { // received a subscription whilst stopping @@ -606,10 +617,10 @@ Message message = configureMessage(md); if (trace) { LOG.trace("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message); - LOG.trace("cameFromRemote = "+cameFromRemote); + LOG.trace("cameFromRemote = "+cameFromRemote + ", repsonseRequired = " + message.isResponseRequired()); } - if (!message.isResponseRequired() || isDuplex()) { + if (!message.isResponseRequired()) { // If the message was originally sent using async // send, we will preserve that QOS Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=703447&r1=703446&r2=703447&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Fri Oct 10 06:11:49 2008 @@ -177,7 +177,7 @@ * reads packets from a Socket */ public void run() { - LOG.trace("TCP consumer thread starting"); + LOG.trace("TCP consumer thread for " + this + " starting"); this.runnerThread=Thread.currentThread(); try { while (!isStopped()) { Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=703447&r1=703446&r2=703447&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Fri Oct 10 06:11:49 2008 @@ -269,6 +269,7 @@ for (int i = 0; i < count; i++) { TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" + i); producer.send(msg); + onSend(i, msg); } producer.close(); @@ -277,6 +278,9 @@ brokerItem.connections.remove(conn); } + protected void onSend(int i, TextMessage msg) { + } + protected TextMessage createTextMessage(Session session, String initText) throws Exception { TextMessage msg = session.createTextMessage(); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java?rev=703447&r1=703446&r2=703447&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java Fri Oct 10 06:11:49 2008 @@ -95,7 +95,7 @@ assertEquals(0, countMbeans(broker, "stopped")); } - assertEquals(0, countMbeans(networkedBroker, "NetworkBridge")); + //assertEquals(0, countMbeans(networkedBroker, "NetworkBridge")); assertEquals(1, countMbeans(networkedBroker, "Connector")); assertEquals(0, countMbeans(networkedBroker, "Connection")); assertEquals(0, countMbeans(broker, "Connection")); Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java?rev=703447&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java Fri Oct 10 06:11:49 2008 @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import java.net.URI; +import java.util.List; + +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.TextMessage; + +import junit.framework.Test; + +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.MessageIdList; +import org.apache.activemq.util.SocketProxy; + + +public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTestSupport { + private static final int NETWORK_DOWN_TIME = 5000; + protected static final int MESSAGE_COUNT = 200; + private static final String HUB = "HubBroker"; + private static final String SPOKE = "SpokeBroker"; + private SocketProxy socketProxy; + private long networkDownTimeStart; + public boolean useDuplexNetworkBridge; + public boolean sumulateStalledNetwork; + + + public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() { + addCombinationValues( "useDuplexNetworkBridge", new Object[]{ Boolean.TRUE, Boolean.FALSE} ); + addCombinationValues( "sumulateStalledNetwork", new Object[]{ Boolean.TRUE } ); + } + + public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception { + bridgeBrokers(SPOKE, HUB); + + startAllBrokers(); + + // Setup destination + Destination dest = createDestination("TEST.FOO", false); + + // Setup consumers + MessageConsumer client = createConsumer(HUB, dest); + + // allow subscription information to flow back to Spoke + sleep(600); + + // Send messages + sendMessages(SPOKE, dest, MESSAGE_COUNT); + + MessageIdList msgs = getConsumerMessages(HUB, client); + msgs.waitForMessagesToArrive(MESSAGE_COUNT); + + assertTrue("At least message " + MESSAGE_COUNT + + " must be recieved, duplicates are expected, count=" + msgs.getMessageCount(), + MESSAGE_COUNT <= msgs.getMessageCount()); + } + + + @Override + protected void startAllBrokers() throws Exception { + // Ensure HUB is started first so bridge will be active from the get go + BrokerItem brokerItem = brokers.get(HUB); + brokerItem.broker.start(); + brokerItem = brokers.get(SPOKE); + brokerItem.broker.start(); + sleep(600); + } + + public void setUp() throws Exception { + networkDownTimeStart = 0; + super.setAutoFail(true); + super.setUp(); + final String options = "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true"; + createBroker(new URI("broker:(tcp://localhost:61617)/" + HUB + options)); + createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options)); + } + + public static Test suite() { + return suite(BrokerQueueNetworkWithDisconnectTest.class); + } + + @Override + protected void onSend(int i, TextMessage msg) { + sleep(50); + if (i == 50 || i == 150) { + if (sumulateStalledNetwork) { + socketProxy.pause(); + } else { + socketProxy.close(); + } + networkDownTimeStart = System.currentTimeMillis(); + } else if (networkDownTimeStart > 0) { + // restart after NETWORK_DOWN_TIME seconds + if (networkDownTimeStart + NETWORK_DOWN_TIME < System.currentTimeMillis()) { + if (sumulateStalledNetwork) { + socketProxy.goOn(); + } else { + socketProxy.reopen(); + } + networkDownTimeStart = 0; + } else { + // slow message production to allow bridge to recover and limit message duplication + sleep(500); + } + } + super.onSend(i, msg); + } + + private void sleep(int milliSecondTime) { + try { + Thread.sleep(milliSecondTime); + } catch (InterruptedException igonred) { + } + } + + + @Override + protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL) throws Exception { + List transportConnectors = remoteBroker.getTransportConnectors(); + URI remoteURI; + if (!transportConnectors.isEmpty()) { + remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri(); + socketProxy = new SocketProxy(remoteURI); + DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:(" + socketProxy.getUrl() + + "?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000)?useExponentialBackOff=false")); + connector.setDynamicOnly(dynamicOnly); + connector.setNetworkTTL(networkTTL); + localBroker.addNetworkConnector(connector); + maxSetupTime = 2000; + if (useDuplexNetworkBridge) { + connector.setDuplex(true); + } + return connector; + } else { + throw new Exception("Remote broker has no registered connectors."); + } + + } + +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java?rev=703447&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java Fri Oct 10 06:11:49 2008 @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class SocketProxy { + + private static final transient Log LOG = LogFactory.getLog(SocketProxy.class); + + public static final int ACCEPT_TIMEOUT_MILLIS = 1000; + + private URI proxyUrl; + private URI target; + private Acceptor acceptor; + private ServerSocket serverSocket; + + public List connections = new LinkedList(); + + private int listenPort = 0; + + public SocketProxy(URI uri) throws Exception { + this(0, uri); + } + + public SocketProxy(int port, URI uri) throws Exception { + listenPort = port; + target = uri; + open(); + } + + protected void open() throws Exception { + if (proxyUrl == null) { + serverSocket = new ServerSocket(listenPort); + proxyUrl = urlFromSocket(target, serverSocket); + } else { + serverSocket = new ServerSocket(proxyUrl.getPort()); + } + acceptor = new Acceptor(serverSocket, target); + new Thread(null, acceptor, "SocketProxy-Acceptor-" + serverSocket.getLocalPort()).start(); + } + + public URI getUrl() { + return proxyUrl; + } + + /* + * close all proxy connections and acceptor + */ + public void close() { + List connections; + synchronized(this.connections) { + connections = new ArrayList(this.connections); + } + LOG.info("close, numConnectons=" + connections.size()); + for (Connection con : connections) { + closeConnection(con); + } + acceptor.close(); + } + + /* + * called after a close to restart the acceptor on the same port + */ + public void reopen() { + LOG.info("reopen"); + try { + open(); + } catch (Exception e) { + LOG.debug("exception on reopen url:" + getUrl(), e); + } + } + + /* + * pause accepting new connecitons and data transfer through existing proxy + * connections. All sockets remain open + */ + public void pause() { + synchronized(connections) { + LOG.info("pause, numConnectons=" + connections.size()); + acceptor.pause(); + for (Connection con : connections) { + con.pause(); + } + } + } + + /* + * continue after pause + */ + public void goOn() { + synchronized(connections) { + LOG.info("goOn, numConnectons=" + connections.size()); + for (Connection con : connections) { + con.goOn(); + } + } + acceptor.goOn(); + } + + private void closeConnection(Connection c) { + try { + c.close(); + } catch (Exception e) { + LOG.debug("exception on close of: " + c, e); + } + } + + private URI urlFromSocket(URI uri, ServerSocket serverSocket) throws Exception { + int listenPort = serverSocket.getLocalPort(); + + return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), listenPort, uri.getPath(), uri.getQuery(), uri.getFragment()); + } + + public class Connection { + + private Socket receiveSocket; + private Socket sendSocket; + private Pump requestThread; + private Pump responseThread; + + public Connection(Socket socket, URI target) throws Exception { + receiveSocket = socket; + sendSocket = new Socket(target.getHost(), target.getPort()); + linkWithThreads(receiveSocket, sendSocket); + LOG.info("proxy connection " + sendSocket); + } + + public void goOn() { + responseThread.goOn(); + requestThread.goOn(); + } + + public void pause() { + requestThread.pause(); + responseThread.pause(); + } + + public void close() throws Exception { + synchronized(connections) { + connections.remove(this); + } + receiveSocket.close(); + sendSocket.close(); + } + + private void linkWithThreads(Socket source, Socket dest) { + requestThread = new Pump(source, dest); + responseThread = new Pump(dest, source); + requestThread.start(); + responseThread.start(); + } + + public class Pump extends Thread { + + protected Socket src; + private Socket destination; + private AtomicReference pause = new AtomicReference(); + + public Pump(Socket source, Socket dest) { + super("SocketProxy-DataTransfer-" + source.getPort() + ":" + dest.getPort()); + src = source; + destination = dest; + pause.set(new CountDownLatch(0)); + } + + public void pause() { + pause.set(new CountDownLatch(1)); + } + + public void goOn() { + pause.get().countDown(); + } + + public void run() { + byte[] buf = new byte[1024]; + try { + InputStream in = src.getInputStream(); + OutputStream out = destination.getOutputStream(); + while (true) { + int len = in.read(buf); + if (len == -1) { + break; + } + pause.get().await(); + out.write(buf, 0, len); + } + } catch (Exception e) { + LOG.debug("read/write failed, reason: " + e.getLocalizedMessage()); + try { + close(); + } catch (Exception ignore) { + } + } + } + + } + } + + public class Acceptor implements Runnable { + + private ServerSocket socket; + private URI target; + private AtomicReference pause = new AtomicReference(); + + + public Acceptor(ServerSocket serverSocket, URI uri) { + socket = serverSocket; + target = uri; + pause.set(new CountDownLatch(0)); + try { + socket.setSoTimeout(ACCEPT_TIMEOUT_MILLIS); + } catch (SocketException e) { + e.printStackTrace(); + } + } + + public void pause() { + pause.set(new CountDownLatch(1)); + } + + public void goOn() { + pause.get().countDown(); + } + + public void run() { + try { + while(!socket.isClosed()) { + pause.get().await(); + try { + Socket source = socket.accept(); + LOG.info("accepted " + source); + synchronized(connections) { + connections.add(new Connection(source, target)); + } + } catch (SocketTimeoutException expected) { + } + } + } catch (Exception e) { + LOG.debug("acceptor: finished for reason: " + e.getLocalizedMessage()); + } + } + + public void close() { + try { + socket.close(); + goOn(); + } catch (IOException ignored) { + } + } + } +} + Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java ------------------------------------------------------------------------------ svn:keywords = Rev Date