activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r980014 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/network/ test/java/org/apache/activemq/transport/tcp/ test/java/org/apache/activemq/usecases/ test/resources/META-INF/servic...
Date Wed, 28 Jul 2010 10:43:27 GMT
Author: gtully
Date: Wed Jul 28 10:43:27 2010
New Revision: 980014

URL: http://svn.apache.org/viewvc?rev=980014&view=rev
Log:
apply patch from https://issues.apache.org/activemq/browse/AMQ-2774

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
  (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
  (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
  (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
  (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
  (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
  (with props)
    activemq/trunk/activemq-core/src/test/resources/META-INF/services/
    activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/
    activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/
    activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/
    activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/
    activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/tcpfaulty
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=980014&r1=980013&r2=980014&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Wed Jul 28 10:43:27 2010
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -37,6 +38,7 @@ import javax.transaction.xa.XAResource;
 import org.apache.activemq.broker.ft.MasterBroker;
 import org.apache.activemq.broker.region.ConnectionStatistics;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.CommandTypes;
@@ -80,7 +82,6 @@ import org.apache.activemq.state.Consume
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.state.SessionState;
 import org.apache.activemq.state.TransactionState;
-import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -88,6 +89,7 @@ import org.apache.activemq.transaction.T
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.ResponseCorrelator;
 import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.MarshallingSupport;
@@ -96,7 +98,7 @@ import org.apache.activemq.util.URISuppo
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import static org.apache.activemq.thread.DefaultThreadPools.*;
+import static org.apache.activemq.thread.DefaultThreadPools.getDefaultTaskRunnerFactory;
 /**
  * @version $Revision: 1.8 $
  */
@@ -149,6 +151,7 @@ public class TransportConnection impleme
     private final TaskRunnerFactory taskRunnerFactory;
     private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
     private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
+    private BrokerId	duplexRemoteBrokerId;
 
     /**
      * @param connector
@@ -1178,6 +1181,20 @@ public class TransportConnection impleme
             // so this TransportConnection is the rear end of a network bridge
             // We have been requested to create a two way pipe ...
             try {
+                // We first look if existing network connection already exists for the same
broker Id
+                // It's possible in case of brief network fault to have this transport connector
side of the connection always active
+                // and the duplex network connector side wanting to open a new one
+                // In this case, the old connection must be broken
+                BrokerId	remoteBrokerId = info.getBrokerId();
+                setDuplexRemoteBrokerId(remoteBrokerId);
+                CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
+                for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();)
{
+            		TransportConnection c = iter.next();
+                    if ((c != this) && (remoteBrokerId.equals(c.getDuplexRemoteBrokerId())))
{
+                        LOG.warn("An existing duplex active connection already exists for
this broker (" + remoteBrokerId + "). Stopping it.");
+                        c.stop();
+                    }
+                }
                 Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
                 Map<String, String> props = createMap(properties);
                 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
@@ -1198,6 +1215,9 @@ public class TransportConnection impleme
                 duplexBridge.duplexStart(this, brokerInfo, info);
                 LOG.info("Created Duplex Bridge back to " + info.getBrokerName());
                 return null;
+            } catch (TransportDisposedIOException e) {
+                LOG.warn("Duplex Bridge back to " + info.getBrokerName() + " was correctly
stopped before it was correctly started.");
+                return null;
             } catch (Exception e) {
                 LOG.error("Creating duplex network bridge", e);
             }
@@ -1391,4 +1411,12 @@ public class TransportConnection impleme
     protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId)
{
         return connectionStateRegister.lookupConnectionState(connectionId);
     }
+
+    protected synchronized void setDuplexRemoteBrokerId(BrokerId remoteBrokerId) {
+        this.duplexRemoteBrokerId = remoteBrokerId;
+    }
+
+    protected synchronized BrokerId getDuplexRemoteBrokerId() {
+        return this.duplexRemoteBrokerId;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=980014&r1=980013&r2=980014&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Wed Jul 28 10:43:27 2010
@@ -233,14 +233,19 @@ public abstract class DemandForwardingBr
                 // initiator side of duplex network
                 remoteBrokerNameKnownLatch.await();
             }
-            try {
-                triggerRemoteStartBridge();
-            } catch (IOException e) {
-                LOG.warn("Caught exception from remote start", e);
-            }
-            NetworkBridgeListener l = this.networkBridgeListener;
-            if (l != null) {
-                l.onStart(this);
+            if (!disposed.get()) {
+                try {
+                    triggerRemoteStartBridge();
+                } catch (IOException e) {
+                    LOG.warn("Caught exception from remote start", e);
+                }
+                NetworkBridgeListener l = this.networkBridgeListener;
+                if (l != null) {
+                    l.onStart(this);
+                }
+    	    } else {
+                LOG.warn ("Bridge was disposed before the start() method was fully executed.");
+                throw new TransportDisposedIOException();
             }
         }
     }
@@ -285,30 +290,38 @@ public abstract class DemandForwardingBr
                 }
                 remoteBrokerNameKnownLatch.await();
 
-                localConnectionInfo = new ConnectionInfo();
-                localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
-                localClientId = "NC_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
-                localConnectionInfo.setClientId(localClientId);
-                localConnectionInfo.setUserName(configuration.getUserName());
-                localConnectionInfo.setPassword(configuration.getPassword());
-                Transport originalTransport = remoteBroker;
-                while (originalTransport instanceof TransportFilter) {
-                    originalTransport = ((TransportFilter) originalTransport).getNext();
-                }
-                if (originalTransport instanceof SslTransport) {
-                    X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
-                    localConnectionInfo.setTransportContext(peerCerts);
-                }
-                localBroker.oneway(localConnectionInfo);
+                if (!disposed.get()) {
+                    localConnectionInfo = new ConnectionInfo();
+                    localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
+                    localClientId = "NC_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
+                    localConnectionInfo.setClientId(localClientId);
+                    localConnectionInfo.setUserName(configuration.getUserName());
+                    localConnectionInfo.setPassword(configuration.getPassword());
+                    Transport originalTransport = remoteBroker;
+                    while (originalTransport instanceof TransportFilter) {
+                        originalTransport = ((TransportFilter) originalTransport).getNext();
+                    }
+                    if (originalTransport instanceof SslTransport) {
+                        X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
+                        localConnectionInfo.setTransportContext(peerCerts);
+                    }
+                    localBroker.oneway(localConnectionInfo);
 
-                localSessionInfo = new SessionInfo(localConnectionInfo, 1);
-                localBroker.oneway(localSessionInfo);
+                    localSessionInfo = new SessionInfo(localConnectionInfo, 1);
+                    localBroker.oneway(localSessionInfo);
 
-                LOG.info("Network connection between " + localBroker + " and " + remoteBroker
+ "(" + remoteBrokerName + ") has been established.");
+                    LOG.info("Network connection between " + localBroker + " and " + remoteBroker
+ "(" + remoteBrokerName + ") has been established.");
 
+                } else {
+                    LOG.warn ("Bridge was disposed before the startLocalBridge() method was
fully executed.");
+                }
                 startedLatch.countDown();
                 localStartedLatch.countDown();
-                setupStaticDestinations();
+                if (!disposed.get()) {
+                    setupStaticDestinations();
+                } else {
+                    LOG.warn("Network connection between " + localBroker + " and " + remoteBroker
+ "(" + remoteBrokerName + ") was interrupted during establishment.");
+                }
             }
         }
     }
@@ -408,6 +421,7 @@ public abstract class DemandForwardingBr
                 }
             }
             LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + "
stopped");
+            remoteBrokerNameKnownLatch.countDown();
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=980014&r1=980013&r2=980014&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
Wed Jul 28 10:43:27 2010
@@ -21,13 +21,13 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.broker.SslContext;
 import org.apache.activemq.command.DiscoveryEvent;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.discovery.DiscoveryAgent;
+import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
 import org.apache.activemq.transport.discovery.DiscoveryListener;
 import org.apache.activemq.util.IntrospectionSupport;
@@ -128,6 +128,8 @@ public class DiscoveryNetworkConnector e
             try {
                 bridge.start();
                 bridges.put(uri, bridge);
+    	    } catch (TransportDisposedIOException e) {
+                LOG.warn("Network bridge between: " + localURI + " and: " + uri + " was correctly
stopped before it was correctly started.");
             } catch (Exception e) {
                 ServiceSupport.dispose(localTransport);
                 ServiceSupport.dispose(remoteTransport);

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java?rev=980014&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
Wed Jul 28 10:43:27 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.util.Random;
+import javax.net.ServerSocketFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ */
+public class ServerSocketTstFactory extends ServerSocketFactory {
+    private static final Log LOG = LogFactory.getLog(ServerSocketTstFactory.class);
+
+    private class ServerSocketTst {
+
+	private final	ServerSocket	socket;
+
+	public ServerSocketTst(int port, Random rnd) throws IOException {
+		this.socket = ServerSocketFactory.getDefault().createServerSocket(port);
+	}
+
+	public ServerSocketTst(int port, int backlog, Random rnd) throws IOException {
+		this.socket = ServerSocketFactory.getDefault().createServerSocket(port, backlog);
+	}
+
+	public ServerSocketTst(int port, int backlog, InetAddress bindAddr, Random rnd) throws IOException
{
+		this.socket = ServerSocketFactory.getDefault().createServerSocket(port, backlog, bindAddr);
+	}
+
+	public ServerSocket	getSocket() {
+		return this.socket;
+	}
+    };
+
+   private final Random	rnd;
+
+   public ServerSocketTstFactory() {
+	super();
+	LOG.info("Creating a new ServerSocketTstFactory");
+	this.rnd = new Random();
+   }
+
+   public ServerSocket createServerSocket(int port) throws IOException {
+	ServerSocketTst	sSock = new ServerSocketTst(port, this.rnd);
+	return sSock.getSocket();
+   }
+
+   public ServerSocket createServerSocket(int port, int backlog) throws IOException {
+	ServerSocketTst	sSock = new ServerSocketTst(port, backlog, this.rnd);
+	return sSock.getSocket();
+   }
+
+   public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws
IOException {
+	ServerSocketTst	sSock = new ServerSocketTst(port, backlog, ifAddress, this.rnd);
+	return sSock.getSocket();
+   }
+
+   private final static ServerSocketTstFactory server = new ServerSocketTstFactory();
+
+   public static ServerSocketTstFactory getDefault() {
+	return server;
+   }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java?rev=980014&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
Wed Jul 28 10:43:27 2010
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.net.SocketFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision$
+ *
+ * Automatically generated socket.close() calls to simulate network faults
+ */
+public class SocketTstFactory extends SocketFactory {
+    private static final Log LOG = LogFactory.getLog(SocketTstFactory.class);
+
+    private static final ConcurrentHashMap<InetAddress, Integer>	closeIter = new ConcurrentHashMap<InetAddress,
Integer>();
+
+    private class SocketTst {
+
+	private class Bagot implements Runnable {
+		private Thread			processus;
+		private Random	rnd;
+		private Socket			socket;
+		private final InetAddress	address;
+
+		public Bagot(Random rnd, Socket socket, InetAddress address) {
+			this.processus  = new Thread(this, "Network Faults maker : undefined");
+			this.rnd	= rnd;
+			this.socket	= socket;
+			this.address	= address;
+		}
+
+		public void start() {
+			this.processus.setName("Network Faults maker : " + this.socket.toString());
+			this.processus.start();
+		}
+
+		public void run () {
+			int 	lastDelayVal;
+			Integer lastDelay;
+			while (!this.processus.isInterrupted()) {
+				if (!this.socket.isClosed()) {
+					try {
+						lastDelay = closeIter.get(this.address);
+						if (lastDelay == null) { 
+							lastDelayVal = 0;
+						}
+						else {
+							lastDelayVal = lastDelay.intValue();
+							if (lastDelayVal > 10)
+								lastDelayVal += 20;
+							else	lastDelayVal += 1;	
+						}
+
+						lastDelay = new Integer(lastDelayVal);
+
+						LOG.info("Trying to close client socket " + socket.toString() +  " in " + lastDelayVal
+ " milliseconds");
+
+						try {
+							Thread.sleep(lastDelayVal);
+						} catch (InterruptedException e) {
+							this.processus.interrupt();
+							Thread.currentThread().interrupt();
+						} catch (IllegalArgumentException e) {
+						}
+							
+						this.socket.close();
+						closeIter.put(this.address, lastDelay);
+						LOG.info("Client socket " + this.socket.toString() + " is closed.");
+					} catch (IOException e) {
+					}
+				}
+
+				this.processus.interrupt();
+			}
+		}
+	}
+
+	private	final Bagot		bagot;
+	private final Socket		socket;
+
+	public SocketTst(InetAddress address, int port, Random rnd) throws IOException {
+		this.socket = new Socket(address, port);
+		bagot = new Bagot(rnd, this.socket, address);
+	}
+
+	public SocketTst(InetAddress address, int port, InetAddress localAddr, int localPort, Random
rnd) throws IOException {
+		this.socket = new Socket(address, port, localAddr, localPort);
+		bagot = new Bagot(rnd, this.socket, address);
+	}
+
+	public SocketTst(String address, int port, Random rnd) throws UnknownHostException, IOException
{
+		this.socket = new Socket(address, port);
+		bagot = new Bagot(rnd, this.socket, InetAddress.getByName(address));
+	}
+
+	public SocketTst(String address, int port, InetAddress localAddr, int localPort, Random
rnd) throws IOException {
+		this.socket = new Socket(address, port, localAddr, localPort);
+		bagot = new Bagot(rnd, this.socket, InetAddress.getByName(address));
+	}
+
+	public Socket getSocket() {
+		return this.socket;
+	}
+
+	public void startBagot() {
+		bagot.start();
+	}
+    };
+
+    private final Random		rnd;
+
+    public SocketTstFactory() {
+	super();
+	LOG.info("Creating a new SocketTstFactory");
+	this.rnd	= new Random();
+    }
+
+    public Socket createSocket(InetAddress host, int port) throws IOException {
+	SocketTst sockTst;
+	sockTst = new SocketTst(host, port, this.rnd);
+	sockTst.startBagot();
+	return sockTst.getSocket();
+    }
+
+    public Socket createSocket(InetAddress host, int port, InetAddress localAddress, int
localPort) throws IOException {
+	SocketTst	sockTst;
+	sockTst = new SocketTst(host, port, localAddress, localPort, this.rnd);
+	sockTst.startBagot();
+	return sockTst.getSocket();
+    }
+
+    public Socket createSocket(String host, int port) throws IOException {
+	SocketTst	sockTst;
+	sockTst = new SocketTst(host, port, this.rnd);
+	sockTst.startBagot();
+	return sockTst.getSocket();
+    }
+
+    public Socket createSocket(String host, int port, InetAddress localAddress, int localPort)
throws IOException {
+	SocketTst	sockTst;
+	sockTst = new SocketTst(host, port, localAddress, localPort, this.rnd);
+	sockTst.startBagot();
+	return sockTst.getSocket();
+    }
+
+    private final static SocketTstFactory client = new SocketTstFactory();
+
+    public static SocketFactory getDefault() {
+	return client;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java?rev=980014&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
Wed Jul 28 10:43:27 2010
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransport;
+
+import org.apache.activemq.wireformat.WireFormat;
+
+import javax.net.SocketFactory;
+
+/**
+ * An implementation of the {@link Transport} interface using raw tcp/ip
+ * 
+ * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement
modifications)
+ * @version $Revision$
+ */
+public class TcpFaultyTransport extends TcpTransport implements Transport, Service, Runnable
{
+
+    public TcpFaultyTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
+                        URI localLocation) throws UnknownHostException, IOException {
+	super(wireFormat, socketFactory, remoteLocation, localLocation);
+    }
+
+    /**
+     * @return pretty print of 'this'
+     */
+    public String toString() {
+        return "tcpfaulty://" + socket.getInetAddress() + ":" + socket.getPort();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java?rev=980014&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
Wed Jul 28 10:43:27 2010
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.InactivityMonitor;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportLoggerFactory;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.WireFormatNegotiator;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.net.Socket;
+import java.net.ServerSocket;
+import java.net.InetAddress;
+
+import org.apache.activemq.transport.tcp.ServerSocketTstFactory;
+
+/**
+ * Automatically generated socket.close() calls to simulate network faults
+ */
+public class TcpFaultyTransportFactory extends TcpTransportFactory {
+    private static final Log LOG = LogFactory.getLog(TcpFaultyTransportFactory.class);
+
+   protected TcpFaultyTransport createTcpFaultyTransport(WireFormat wf, SocketFactory socketFactory,
URI location, URI localLocation) throws UnknownHostException, IOException {
+        return new TcpFaultyTransport(wf, socketFactory, location, localLocation);
+    }
+
+    protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException,
IOException {
+        URI localLocation = null;
+        String path = location.getPath();
+        // see if the path is a local URI location
+        if (path != null && path.length() > 0) {
+            int localPortIndex = path.indexOf(':');
+            try {
+                Integer.parseInt(path.substring(localPortIndex + 1, path.length()));
+                String localString = location.getScheme() + ":/" + path;
+                localLocation = new URI(localString);
+            } catch (Exception e) {
+                LOG.warn("path isn't a valid local location for TcpTransport to use", e);
+            }
+        }
+        SocketFactory socketFactory = createSocketFactory();
+        return createTcpFaultyTransport(wf, socketFactory, location, localLocation);
+    }
+
+    protected TcpFaultyTransportServer createTcpFaultyTransportServer(final URI location,
ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException     {
+        return new TcpFaultyTransportServer(this, location, serverSocketFactory);
+    }
+
+    public TransportServer doBind(final URI location) throws IOException {
+        try {
+            Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
+
+            ServerSocketFactory serverSocketFactory = createServerSocketFactory();
+            TcpFaultyTransportServer server = createTcpFaultyTransportServer(location, serverSocketFactory);
+            server.setWireFormatFactory(createWireFormatFactory(options));
+            IntrospectionSupport.setProperties(server, options);
+            Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options,
"transport.");
+            server.setTransportOption(transportOptions);
+            server.bind();
+
+            return server;
+        } catch (URISyntaxException e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    
+    protected SocketFactory createSocketFactory() throws IOException {
+	return SocketTstFactory.getDefault();
+    }
+
+    
+    protected ServerSocketFactory createServerSocketFactory() throws IOException {
+	return ServerSocketTstFactory.getDefault();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java?rev=980014&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
Wed Jul 28 10:43:27 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+import org.apache.activemq.util.ServiceListener;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+
+import javax.net.ServerSocketFactory;
+
+/**
+ * A TCP based implementation of {@link TransportServer}
+ * 
+ * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement
modifications)
+ * @version $Revision$
+ */
+
+public class TcpFaultyTransportServer extends TcpTransportServer implements ServiceListener{
+
+    public TcpFaultyTransportServer(TcpFaultyTransportFactory transportFactory, URI location,
ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+	super(transportFactory, location, serverSocketFactory);
+    }
+
+    /**
+     * @return pretty print of this
+     */
+    public String toString() {
+        return "" + getBindLocation();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java?rev=980014&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
Wed Jul 28 10:43:27 2010
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.List;
+
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+import junit.framework.Test;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.SocketProxy;
+
+
+public class MulticastDiscoveryOnFaultyNetworkTest extends JmsMultipleBrokersTestSupport
{
+    protected static final int MESSAGE_COUNT = 200;
+    private static final String HUB = "HubBroker";
+    private static final String SPOKE = "SpokeBroker";
+    public boolean useDuplexNetworkBridge;
+    public boolean sumulateStalledNetwork;
+
+   private TransportConnector mCastTrpConnector;
+   
+    public void initCombosForTestSendOnAFaultyTransport() {
+        addCombinationValues( "useDuplexNetworkBridge", new Object[]{ Boolean.TRUE , Boolean.FALSE
} );
+        addCombinationValues( "sumulateStalledNetwork", new Object[]{ Boolean.TRUE } );
+    }
+    
+    public void testSendOnAFaultyTransport() throws Exception {
+        bridgeBrokers(SPOKE, HUB);
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", false);
+        
+        // Setup consumers
+        MessageConsumer client = createConsumer(HUB, dest);
+        
+        // allow subscription information to flow back to Spoke
+        sleep(600);
+        
+        // Send messages
+        sendMessages(SPOKE, dest, MESSAGE_COUNT);
+
+        MessageIdList msgs = getConsumerMessages(HUB, client);
+	msgs.setMaximumDuration(200000L);
+        msgs.waitForMessagesToArrive(MESSAGE_COUNT);
+
+        assertTrue("At least message " + MESSAGE_COUNT + 
+                " must be recieved, duplicates are expected, count=" + msgs.getMessageCount(),
+                MESSAGE_COUNT <= msgs.getMessageCount());
+    }
+
+    
+    @Override
+    protected void startAllBrokers() throws Exception {
+        // Ensure HUB is started first so bridge will be active from the get go
+        BrokerItem brokerItem = brokers.get(HUB);
+        brokerItem.broker.start();
+        brokerItem = brokers.get(SPOKE);
+        brokerItem.broker.start();
+    }
+
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+        final String options = "?persistent=false&useJmx=false&deleteAllMessagesOnStartup=true";
+        createBroker(new URI("broker:(tcpfaulty://localhost:61617)/" + HUB + options));
+        createBroker(new URI("broker:(tcpfaulty://localhost:61616)/" + SPOKE + options));
+    }
+    
+    public static Test suite() {
+        return suite(MulticastDiscoveryOnFaultyNetworkTest.class);
+    }
+       
+    @Override
+    protected void onSend(int i, TextMessage msg) {
+        sleep(50);
+    }
+
+    private void sleep(int milliSecondTime) {
+        try {
+            Thread.sleep(milliSecondTime);
+        } catch (InterruptedException igonred) {
+        }    
+    }
+
+
+    @Override
+    protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker,
boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception {
+        DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("multicast://default?group=TESTERIC"));
+        connector.setDynamicOnly(dynamicOnly);
+        connector.setNetworkTTL(networkTTL);
+        localBroker.addNetworkConnector(connector);
+        maxSetupTime = 2000;
+        if (useDuplexNetworkBridge) {
+            connector.setDuplex(true);
+        }
+
+        List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
+        if (!transportConnectors.isEmpty()) {
+		mCastTrpConnector = ((TransportConnector)transportConnectors.get(0));
+		mCastTrpConnector.setDiscoveryUri(new URI("multicast://default?group=TESTERIC"));
+	}
+	return connector;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/tcpfaulty
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/tcpfaulty?rev=980014&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/tcpfaulty
(added)
+++ activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/tcpfaulty
Wed Jul 28 10:43:27 2010
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.tcp.TcpFaultyTransportFactory



Mime
View raw message