activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r703447 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ main/java/org/apache/activemq/transport/tcp/ test/java/org/apache/activemq/ test/java/org/apache/activemq/network/ test/java/org/apache/activemq/usecases...
Date Fri, 10 Oct 2008 13:11:50 GMT
Author: gtully
Date: Fri Oct 10 06:11:49 2008
New Revision: 703447

URL: http://svn.apache.org/viewvc?rev=703447&view=rev
Log:
revert/rework AMQ-1521, fix for AMQ-1973; new test case and SocketProxy that can be used to
simulate network outage

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
  (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java 
 (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=703447&r1=703446&r2=703447&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Fri Oct 10 06:11:49 2008
@@ -420,7 +420,14 @@
                             if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()))
{
                                 serviceRemoteConsumerAdvisory(message.getDataStructure());
                             } else {
-                                localBroker.oneway(message);
+                                if (message.isResponseRequired()) {
+                                    Response reply = new Response();
+                                    reply.setCorrelationId(message.getCommandId());
+                                    localBroker.oneway(message);
+                                    remoteBroker.oneway(reply);
+                                } else {
+                                    localBroker.oneway(message);
+                                }
                             }
                         } else {
                             switch (command.getDataStructureType()) {
@@ -436,6 +443,10 @@
                                         if (LOG.isDebugEnabled()) {
                                             LOG.debug("Ignoring ConsumerInfo: "+ command);
                                         }
+                                    } else {
+                                        if (LOG.isTraceEnabled()) {
+                                            LOG.trace("Adding ConsumerInfo: "+ command);
+                                        }
                                     }
                                 } else {
                                     // received a subscription whilst stopping
@@ -606,10 +617,10 @@
                         Message message = configureMessage(md);
                         if (trace) {
                             LOG.trace("bridging " + configuration.getBrokerName() + " ->
" + remoteBrokerName + ": " + message);
-                            LOG.trace("cameFromRemote = "+cameFromRemote);    
+                            LOG.trace("cameFromRemote = "+cameFromRemote + ", repsonseRequired
= " + message.isResponseRequired());    
                         }
 
-                        if (!message.isResponseRequired() || isDuplex()) {
+                        if (!message.isResponseRequired()) {
 
                             // If the message was originally sent using async
                             // send, we will preserve that QOS

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=703447&r1=703446&r2=703447&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
Fri Oct 10 06:11:49 2008
@@ -177,7 +177,7 @@
      * reads packets from a Socket
      */
     public void run() {
-        LOG.trace("TCP consumer thread starting");
+        LOG.trace("TCP consumer thread for " + this + " starting");
         this.runnerThread=Thread.currentThread();
         try {
             while (!isStopped()) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=703447&r1=703446&r2=703447&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
Fri Oct 10 06:11:49 2008
@@ -269,6 +269,7 @@
         for (int i = 0; i < count; i++) {
             TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" +
i);
             producer.send(msg);
+            onSend(i, msg);
         }
 
         producer.close();
@@ -277,6 +278,9 @@
         brokerItem.connections.remove(conn);
     }
 
+    protected void onSend(int i, TextMessage msg) {
+    }
+
     protected TextMessage createTextMessage(Session session, String initText) throws Exception
{
         TextMessage msg = session.createTextMessage();
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java?rev=703447&r1=703446&r2=703447&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
Fri Oct 10 06:11:49 2008
@@ -95,7 +95,7 @@
             assertEquals(0, countMbeans(broker, "stopped"));
         }
         
-        assertEquals(0, countMbeans(networkedBroker, "NetworkBridge"));
+        //assertEquals(0, countMbeans(networkedBroker, "NetworkBridge"));
         assertEquals(1, countMbeans(networkedBroker, "Connector"));
         assertEquals(0, countMbeans(networkedBroker, "Connection"));
         assertEquals(0, countMbeans(broker, "Connection"));

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java?rev=703447&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
Fri Oct 10 06:11:49 2008
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.List;
+
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+import junit.framework.Test;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.SocketProxy;
+
+
+public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTestSupport {
+    private static final int NETWORK_DOWN_TIME = 5000;
+    protected static final int MESSAGE_COUNT = 200;
+    private static final String HUB = "HubBroker";
+    private static final String SPOKE = "SpokeBroker";
+    private SocketProxy socketProxy;
+    private long networkDownTimeStart;
+    public boolean useDuplexNetworkBridge;
+    public boolean sumulateStalledNetwork;
+
+   
+    public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() {
+        addCombinationValues( "useDuplexNetworkBridge", new Object[]{ Boolean.TRUE, Boolean.FALSE}
);
+        addCombinationValues( "sumulateStalledNetwork", new Object[]{ Boolean.TRUE } );
+    }
+    
+    public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
+        bridgeBrokers(SPOKE, HUB);
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", false);
+        
+        // Setup consumers
+        MessageConsumer client = createConsumer(HUB, dest);
+        
+        // allow subscription information to flow back to Spoke
+        sleep(600);
+        
+        // Send messages
+        sendMessages(SPOKE, dest, MESSAGE_COUNT);
+
+        MessageIdList msgs = getConsumerMessages(HUB, client);
+        msgs.waitForMessagesToArrive(MESSAGE_COUNT);
+
+        assertTrue("At least message " + MESSAGE_COUNT + 
+                " must be recieved, duplicates are expected, count=" + msgs.getMessageCount(),
+                MESSAGE_COUNT <= msgs.getMessageCount());
+    }
+
+    
+    @Override
+    protected void startAllBrokers() throws Exception {
+        // Ensure HUB is started first so bridge will be active from the get go
+        BrokerItem brokerItem = brokers.get(HUB);
+        brokerItem.broker.start();
+        brokerItem = brokers.get(SPOKE);
+        brokerItem.broker.start();
+        sleep(600);
+    }
+
+    public void setUp() throws Exception {
+        networkDownTimeStart = 0;
+        super.setAutoFail(true);
+        super.setUp();
+        final String options = "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true";
+        createBroker(new URI("broker:(tcp://localhost:61617)/" + HUB + options));
+        createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options));
+    }
+    
+    public static Test suite() {
+        return suite(BrokerQueueNetworkWithDisconnectTest.class);
+    }
+       
+    @Override
+    protected void onSend(int i, TextMessage msg) {
+        sleep(50);
+        if (i == 50 || i == 150) {
+            if (sumulateStalledNetwork) {
+                socketProxy.pause();
+            } else {
+                socketProxy.close();
+            }
+            networkDownTimeStart = System.currentTimeMillis();
+        } else if (networkDownTimeStart > 0) {
+             // restart after NETWORK_DOWN_TIME seconds
+             if (networkDownTimeStart + NETWORK_DOWN_TIME < System.currentTimeMillis())
{
+                 if (sumulateStalledNetwork) {
+                     socketProxy.goOn();
+                 } else {
+                     socketProxy.reopen();
+                 }
+                 networkDownTimeStart = 0;
+             } else {
+                 // slow message production to allow bridge to recover and limit message
duplication
+                 sleep(500);
+            }
+        }
+        super.onSend(i, msg);
+    }
+
+    private void sleep(int milliSecondTime) {
+        try {
+            Thread.sleep(milliSecondTime);
+        } catch (InterruptedException igonred) {
+        }    
+    }
+
+
+    @Override
+    protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker,
boolean dynamicOnly, int networkTTL) throws Exception {
+        List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
+        URI remoteURI;
+        if (!transportConnectors.isEmpty()) {
+            remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri();
+            socketProxy = new SocketProxy(remoteURI);
+            DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:("
+ socketProxy.getUrl() 
+                    + "?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000)?useExponentialBackOff=false"));
+            connector.setDynamicOnly(dynamicOnly);
+            connector.setNetworkTTL(networkTTL);
+            localBroker.addNetworkConnector(connector);
+            maxSetupTime = 2000;
+            if (useDuplexNetworkBridge) {
+                connector.setDuplex(true);
+            }
+            return connector;
+        } else {
+            throw new Exception("Remote broker has no registered connectors.");
+        }
+
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java?rev=703447&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java Fri
Oct 10 06:11:49 2008
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class SocketProxy {
+
+    private static final transient Log LOG = LogFactory.getLog(SocketProxy.class);
+
+    public static final int ACCEPT_TIMEOUT_MILLIS = 1000;
+
+    private URI proxyUrl;
+    private URI target;
+    private Acceptor acceptor;
+    private ServerSocket serverSocket;
+
+    public List<Connection> connections = new LinkedList<Connection>();
+
+    private int listenPort = 0;
+
+    public SocketProxy(URI uri) throws Exception {
+        this(0, uri);
+    }
+
+    public SocketProxy(int port, URI uri) throws Exception {
+        listenPort = port;
+        target = uri;
+        open();
+    }
+
+    protected void open() throws Exception {
+        if (proxyUrl == null) {
+            serverSocket = new ServerSocket(listenPort);
+            proxyUrl = urlFromSocket(target, serverSocket);
+        } else {
+            serverSocket = new ServerSocket(proxyUrl.getPort());
+        }
+        acceptor = new Acceptor(serverSocket, target);
+        new Thread(null, acceptor, "SocketProxy-Acceptor-" + serverSocket.getLocalPort()).start();
+    }
+
+    public URI getUrl() {
+        return proxyUrl;
+    }
+
+    /*
+     * close all proxy connections and acceptor
+     */
+    public void close() {
+        List<Connection> connections;
+        synchronized(this.connections) {
+            connections = new ArrayList<Connection>(this.connections);
+        }            
+        LOG.info("close, numConnectons=" + connections.size());
+        for (Connection con : connections) {
+            closeConnection(con);
+        }
+        acceptor.close();
+    }
+
+    /*
+     * called after a close to restart the acceptor on the same port
+     */
+    public void reopen() {
+        LOG.info("reopen");
+        try {
+            open();
+        } catch (Exception e) {
+            LOG.debug("exception on reopen url:" + getUrl(), e);
+        }
+    }
+
+    /*
+     * pause accepting new connecitons and data transfer through existing proxy
+     * connections. All sockets remain open
+     */
+    public void pause() {
+        synchronized(connections) {
+            LOG.info("pause, numConnectons=" + connections.size());
+            acceptor.pause();
+            for (Connection con : connections) {
+                con.pause();
+            }
+        }
+    }
+
+    /*
+     * continue after pause
+     */
+    public void goOn() {
+        synchronized(connections) {
+            LOG.info("goOn, numConnectons=" + connections.size());
+            for (Connection con : connections) {
+                con.goOn();
+            }
+        }
+        acceptor.goOn();
+    }
+
+    private void closeConnection(Connection c) {
+        try {
+            c.close();
+        } catch (Exception e) {
+            LOG.debug("exception on close of: " + c, e);
+        }
+    }
+
+    private URI urlFromSocket(URI uri, ServerSocket serverSocket) throws Exception {
+        int listenPort = serverSocket.getLocalPort();
+
+        return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), listenPort, uri.getPath(),
uri.getQuery(), uri.getFragment());
+    }
+
+    public class Connection {
+
+        private Socket receiveSocket;
+        private Socket sendSocket;
+        private Pump requestThread;
+        private Pump responseThread;
+
+        public Connection(Socket socket, URI target) throws Exception {
+            receiveSocket = socket;
+            sendSocket = new Socket(target.getHost(), target.getPort());
+            linkWithThreads(receiveSocket, sendSocket);
+            LOG.info("proxy connection " + sendSocket);
+        }
+
+        public void goOn() {
+            responseThread.goOn();
+            requestThread.goOn();
+        }
+
+        public void pause() {
+            requestThread.pause();
+            responseThread.pause();
+        }
+
+        public void close() throws Exception {
+            synchronized(connections) {
+                connections.remove(this);
+            }
+            receiveSocket.close();
+            sendSocket.close();
+        }
+
+        private void linkWithThreads(Socket source, Socket dest) {
+            requestThread = new Pump(source, dest);
+            responseThread = new Pump(dest, source);
+            requestThread.start();
+            responseThread.start();
+        }
+
+        public class Pump extends Thread {
+
+            protected Socket src;
+            private Socket destination;
+            private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
+
+            public Pump(Socket source, Socket dest) {
+                super("SocketProxy-DataTransfer-" + source.getPort() + ":" + dest.getPort());
+                src = source;
+                destination = dest;
+                pause.set(new CountDownLatch(0));
+            }
+
+            public void pause() {
+                pause.set(new CountDownLatch(1));
+            }
+
+            public void goOn() {
+                pause.get().countDown();
+            }
+
+            public void run() {
+                byte[] buf = new byte[1024];
+                try {
+                    InputStream in = src.getInputStream();
+                    OutputStream out = destination.getOutputStream();
+                    while (true) {
+                        int len = in.read(buf);
+                        if (len == -1) {
+                            break;
+                        }
+                        pause.get().await();
+                        out.write(buf, 0, len);
+                    }
+                } catch (Exception e) {
+                    LOG.debug("read/write failed, reason: " + e.getLocalizedMessage());
+                    try {
+                        close();
+                    } catch (Exception ignore) {
+                    }
+                }
+            }
+
+        }
+    }
+
+    public class Acceptor implements Runnable {
+
+        private ServerSocket socket;
+        private URI target;
+        private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
+
+
+        public Acceptor(ServerSocket serverSocket, URI uri) {
+            socket = serverSocket;
+            target = uri;
+            pause.set(new CountDownLatch(0));
+            try {
+                socket.setSoTimeout(ACCEPT_TIMEOUT_MILLIS);
+            } catch (SocketException e) {
+                e.printStackTrace();
+            }
+        }
+
+        public void pause() {
+            pause.set(new CountDownLatch(1));
+        }
+        
+        public void goOn() {
+            pause.get().countDown();
+        }
+
+        public void run() {
+            try {
+                while(!socket.isClosed()) {
+                    pause.get().await();
+                    try {
+                        Socket source = socket.accept();
+                        LOG.info("accepted " + source);
+                        synchronized(connections) {
+                            connections.add(new Connection(source, target));
+                        }
+                    } catch (SocketTimeoutException expected) {
+                    }
+                }
+            } catch (Exception e) {
+                LOG.debug("acceptor: finished for reason: " + e.getLocalizedMessage());
+            }
+        }
+        
+        public void close() {
+            try {
+                socket.close();
+                goOn();
+            } catch (IOException ignored) {
+            }
+        }
+    }
+}
+

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message