activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r617015 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport: ./ failover/ fanout/ mock/ vm/
Date Thu, 31 Jan 2008 06:34:15 GMT
Author: rajdavies
Date: Wed Jan 30 22:34:12 2008
New Revision: 617015

URL: http://svn.apache.org/viewvc?rev=617015&view=rev
Log:
added isDisposed to Transport interface

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java?rev=617015&r1=617014&r2=617015&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
Wed Jan 30 22:34:12 2008
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport;
 
 import java.io.IOException;
+import java.net.URI;
 
 import org.apache.activemq.Service;
 
@@ -135,5 +136,17 @@
      * @return true if fault tolerant
      */
     boolean isFaultTolerant();
+    
+    /**
+     * @return true if the transport is disposed
+     */
+    boolean isDisposed();
+    
+    /**
+     * reconnect to another location
+     * @param uri
+     * @throws IOException on failure of if not supported
+     */
+    void reconnect(URI uri) throws IOException;
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=617015&r1=617014&r2=617015&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
Wed Jan 30 22:34:12 2008
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport;
 
 import java.io.IOException;
+import java.net.URI;
 
 /**
  * @version $Revision: 1.5 $
@@ -124,4 +125,12 @@
     public boolean isFaultTolerant() {
         return next.isFaultTolerant();
     }
+
+	public boolean isDisposed() {
+		return next.isDisposed();
+	}
+
+	public void reconnect(URI uri) throws IOException {
+		next.reconnect(uri);
+	}
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java?rev=617015&r1=617014&r2=617015&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java
Wed Jan 30 22:34:12 2008
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport;
 
 import java.io.IOException;
+import java.net.URI;
 
 import org.apache.activemq.util.ServiceSupport;
 import org.apache.commons.logging.Log;
@@ -105,5 +106,14 @@
     public boolean isFaultTolerant() {
         return false;
     }
+    
+   
+	public void reconnect(URI uri) throws IOException {
+		throw new IOException("Not supported");
+	}
+	
+	public boolean isDisposed() {
+		return isStopped();
+	}
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java?rev=617015&r1=617014&r2=617015&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
Wed Jan 30 22:34:12 2008
@@ -18,13 +18,28 @@
 
 package org.apache.activemq.transport.failover;
 
+import java.io.IOException;
 import java.net.URI;
 
+import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.Transport;
 
-public class BackupTransport {
+class BackupTransport extends DefaultTransportListener{
+	private FailoverTransport failoverTransport;
 	private Transport transport;
 	private URI uri;
+	private boolean disposed;
+	
+	BackupTransport(FailoverTransport ft){
+		this.failoverTransport=ft;
+	}
+	public void onException(IOException error) {
+		this.disposed=true;
+		if (failoverTransport!=null) {
+			this.failoverTransport.reconnect();
+		}
+	}
+
 	public Transport getTransport() {
 		return transport;
 	}
@@ -36,6 +51,14 @@
 	}
 	public void setUri(URI uri) {
 		this.uri = uri;
+	}
+	
+	public boolean isDisposed() {
+		return disposed || transport != null && transport.isDisposed();
+	}
+	
+	public void setDisposed(boolean disposed) {
+		this.disposed = disposed;
 	}
 	
 	public int hashCode() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=617015&r1=617014&r2=617015&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Wed Jan 30 22:34:12 2008
@@ -35,7 +35,6 @@
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.transport.CompositeTransport;
-import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
@@ -66,6 +65,7 @@
     private final ConcurrentHashMap<Integer, Command> requestMap = new ConcurrentHashMap<Integer,
Command>();
 
     private URI connectedTransportURI;
+    private URI failedConnectTransportURI;
     private Transport connectedTransport;
     private final TaskRunner reconnectTask;
     private boolean started;
@@ -96,9 +96,17 @@
         // Setup a task that is used to reconnect the a connection async.
         reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new
Task() {
             public boolean iterate() {
-            	boolean result = doReconnect();
-            	if(!result) {
+            	boolean result=false;
+            	boolean buildBackup=true;
+            	if (connectedTransport==null && !disposed) {
+            		result=doReconnect();
+            		buildBackup=false;
+            	}
+            	if(buildBackup) {
             		buildBackups();
+            	}else {
+            		//build backups on the next iteration
+            		result=true;
             	}
             	return result;
             }
@@ -171,6 +179,7 @@
             if (connectedTransport != null) {
                 initialized = false;
                 ServiceSupport.dispose(connectedTransport);
+                failedConnectTransportURI=connectedTransportURI;
                 connectedTransport = null;
                 connectedTransportURI = null;
             }
@@ -441,8 +450,8 @@
     private List<URI> getConnectList() {
         ArrayList<URI> l = new ArrayList<URI>(uris);
         boolean removed = false;
-        if (connectedTransportURI != null) {
-            removed = l.remove(connectedTransportURI);
+        if (failedConnectTransportURI != null) {
+            removed = l.remove(failedConnectTransportURI);
         }
         if (randomize) {
             // Randomly, reorder the list by random swapping
@@ -456,7 +465,7 @@
             }
         }
         if (removed) {
-            l.add(connectedTransportURI);
+            l.add(failedConnectTransportURI);
         }
         return l;
     }
@@ -544,6 +553,7 @@
                                     restoreTransport(t);  
                             }
                             reconnectDelay = initialReconnectDelay;
+                            failedConnectTransportURI=null;
                             connectedTransportURI = uri;
                             connectedTransport = t;
                             reconnectMutex.notifyAll();
@@ -625,17 +635,26 @@
    
    final boolean buildBackups() {
 	   synchronized (reconnectMutex) {
-		   if (backup && backups.size() < backupPoolSize) {
+		   if (!disposed && backup && backups.size() < backupPoolSize) {
 			   List<URI> connectList = getConnectList();
+			   //removed disposed backups
+			   List<BackupTransport>disposedList = new ArrayList<BackupTransport>();
+			   for (BackupTransport bt:backups) {
+				   if (bt.isDisposed()) {
+					   disposedList.add(bt);
+				   }
+			   }
+			   backups.removeAll(disposedList);
+			   disposedList.clear();
 			   for (Iterator<URI>iter = connectList.iterator();iter.hasNext() && backups.size()
< backupPoolSize;) {
 				   URI uri = iter.next();
 				   if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
 					   try {
-						   BackupTransport bt = new BackupTransport();
+						   BackupTransport bt = new BackupTransport(this);
 						   bt.setUri(uri);
 						   if (!backups.contains(bt)) {
 							   Transport t = TransportFactory.compositeConnect(uri);
-		                       t.setTransportListener(new DefaultTransportListener());
+		                       t.setTransportListener(bt);
 		                       t.start();
 		                       bt.setTransport(t);
 		                       backups.add(bt);
@@ -649,6 +668,14 @@
 	   }
 	   return false;
    }
+
+public boolean isDisposed() {
+	return disposed;
+}
+
+public void reconnect(URI uri) throws IOException {
+	add(new URI[] {uri});
+}
 
 
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?rev=617015&r1=617014&r2=617015&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
Wed Jan 30 22:34:12 2008
@@ -541,6 +541,12 @@
         }
 
     }
+    
+    public void reconnect(URI uri) throws IOException {
+		add(new URI[]{uri});
+		
+	}
+
 
     public String getRemoteAddress() {
         if (primary != null) {
@@ -569,4 +575,7 @@
         this.fanOutQueues = fanOutQueues;
     }
 
+	public boolean isDisposed() {
+		return disposed;
+	}
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java?rev=617015&r1=617014&r2=617015&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
Wed Jan 30 22:34:12 2008
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.mock;
 
 import java.io.IOException;
+import java.net.URI;
 
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.FutureResponse;
@@ -138,5 +139,13 @@
     public boolean isFaultTolerant() {
         return getNext().isFaultTolerant();
     }
+
+	public boolean isDisposed() {
+		return getNext().isDisposed();
+	}
+
+	public void reconnect(URI uri) throws IOException {
+		getNext().reconnect(uri);
+	}
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=617015&r1=617014&r2=617015&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Wed Jan 30 22:34:12 2008
@@ -327,4 +327,12 @@
     public boolean isFaultTolerant() {
         return false;
     }
+
+	public boolean isDisposed() {
+		return disposed;
+	}
+
+	public void reconnect(URI uri) throws IOException {
+		throw new IOException("Not supported");
+	}
 }



Mime
View raw message