activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r920306 [2/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/co...
Date Mon, 08 Mar 2010 12:48:46 GMT
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Mon Mar  8 12:48:45 2010
@@ -19,15 +19,18 @@
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.net.InetAddress;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionControl;
 import org.apache.activemq.command.ConnectionId;
@@ -50,6 +53,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+
 /**
  * A Transport that is made reliable by being able to fail over to another
  * transport when a transport failure is detected.
@@ -64,6 +68,7 @@
     private boolean disposed;
     private boolean connected;
     private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>();
+    private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>();
 
     private final Object reconnectMutex = new Object();
     private final Object backupMutex = new Object();
@@ -77,28 +82,28 @@
     private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
     private final TaskRunner reconnectTask;
     private boolean started;
-
+    private boolean initialized;
     private long initialReconnectDelay = 10;
     private long maxReconnectDelay = 1000 * 30;
     private double backOffMultiplier = 2d;
     private long timeout = -1;
     private boolean useExponentialBackOff = true;
     private boolean randomize = true;
-    private boolean initialized;
     private int maxReconnectAttempts;
     private int startupMaxReconnectAttempts;
     private int connectFailures;
     private long reconnectDelay = this.initialReconnectDelay;
     private Exception connectionFailure;
     private boolean firstConnection = true;
-    //optionally always have a backup created
-    private boolean backup=false;
-    private List<BackupTransport> backups=new CopyOnWriteArrayList<BackupTransport>();
-    private int backupPoolSize=1;
+    // optionally always have a backup created
+    private boolean backup = false;
+    private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>();
+    private int backupPoolSize = 1;
     private boolean trackMessages = false;
     private boolean trackTransactionProducers = true;
     private int maxCacheSize = 128 * 1024;
-    private TransportListener disposedListener = new DefaultTransportListener() {};
+    private final TransportListener disposedListener = new DefaultTransportListener() {
+    };
     private boolean connectionInterruptProcessingComplete;
 
     private final TransportListener myTransportListener = createTransportListener();
@@ -109,27 +114,27 @@
         // Setup a task that is used to reconnect the a connection async.
         reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
             public boolean iterate() {
-            	boolean result=false;
-            	boolean buildBackup=true;
-            	boolean doReconnect = !disposed;
-            	synchronized(backupMutex) {
-                	if (connectedTransport.get()==null && !disposed) {
-                		result=doReconnect();
-                		buildBackup=false;
-                	}
-            	}
-            	if(buildBackup) {
-            		buildBackups();
-            	}else {
-            		//build backups on the next iteration
-            		result=true;
-            		try {
+                boolean result = false;
+                boolean buildBackup = true;
+                boolean doReconnect = !disposed;
+                synchronized (backupMutex) {
+                    if (connectedTransport.get() == null && !disposed) {
+                        result = doReconnect();
+                        buildBackup = false;
+                    }
+                }
+                if (buildBackup) {
+                    buildBackups();
+                } else {
+                    // build backups on the next iteration
+                    result = true;
+                    try {
                         reconnectTask.wakeup();
                     } catch (InterruptedException e) {
                         LOG.debug("Reconnect task has been interrupted.", e);
                     }
-            	}
-            	return result;
+                }
+                return result;
             }
 
         }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
@@ -138,32 +143,25 @@
     TransportListener createTransportListener() {
         return new TransportListener() {
             public void onCommand(Object o) {
-                Command command = (Command)o;
+                Command command = (Command) o;
                 if (command == null) {
                     return;
                 }
                 if (command.isResponse()) {
                     Object object = null;
-                    synchronized(requestMap) {
-                     object = requestMap.remove(Integer.valueOf(((Response)command).getCorrelationId()));
+                    synchronized (requestMap) {
+                        object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
                     }
                     if (object != null && object.getClass() == Tracked.class) {
-                        ((Tracked)object).onResponses();
+                        ((Tracked) object).onResponses();
                     }
                 }
-                if (!initialized) {
-                    if (command.isBrokerInfo()) {
-                        BrokerInfo info = (BrokerInfo)command;
-                        BrokerInfo[] peers = info.getPeerBrokerInfos();
-                        if (peers != null) {
-                            for (int i = 0; i < peers.length; i++) {
-                                String brokerString = peers[i].getBrokerURL();
-                                add(brokerString);
-                            }
-                        }
-                        initialized = true;
-                    }
-
+                if (!initialized) {      
+                    initialized = true;
+                }
+                
+                if(command.isConnectionControl()) {
+                    handleConnectionControl((ConnectionControl) command);
                 }
                 if (transportListener != null) {
                     transportListener.onCommand(command);
@@ -193,7 +191,6 @@
         };
     }
 
-
     public final void handleTransportFailure(IOException e) throws InterruptedException {
         if (LOG.isTraceEnabled()) {
             LOG.trace(this + " handleTransportFailure: " + e);
@@ -202,39 +199,83 @@
         if (transport == null) {
             // sync with possible in progress reconnect
             synchronized (reconnectMutex) {
-                transport = connectedTransport.getAndSet(null); 
+                transport = connectedTransport.getAndSet(null);
             }
         }
         if (transport != null) {
-            
+
             transport.setTransportListener(disposedListener);
             ServiceSupport.dispose(transport);
-            
+
             boolean reconnectOk = false;
             synchronized (reconnectMutex) {
-                if(started) {
-                    LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed to " + connectedTransportURI+ " , attempting to automatically reconnect due to: " + e);
+                if (started) {
+                    LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed to " + connectedTransportURI
+                            + " , attempting to automatically reconnect due to: " + e);
                     LOG.debug("Transport failed with the following exception:", e);
                     reconnectOk = true;
-                }          
+                }
                 initialized = false;
-                failedConnectTransportURI=connectedTransportURI;
+                failedConnectTransportURI = connectedTransportURI;
                 connectedTransportURI = null;
-                connected=false;
+                connected = false;
 
                 stateTracker.transportInterrupted();
 
-                // notify before any reconnect attempt so ack state can be whacked
+                // notify before any reconnect attempt so ack state can be
+                // whacked
                 if (transportListener != null) {
                     transportListener.transportInterupted();
                 }
-            
+
                 if (reconnectOk) {
                     reconnectTask.wakeup();
                 }
             }
         }
+    }
+
+    public final void handleConnectionControl(ConnectionControl control) {
+        String reconnectStr = control.getReconnectTo();
+        if (reconnectStr != null) {
+            reconnectStr = reconnectStr.trim();
+            if (reconnectStr.length() > 0) {
+                try {
+                    URI uri = new URI(reconnectStr);
+                    if (isReconnectSupported()) {
+                        reconnect(uri);
+                        LOG.info("Reconnected to: " + uri);
+                    }
+                } catch (Exception e) {
+                    LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e);
+                }
+            }
+        }
+        String connectedStr = control.getConnectedBrokers();
+        if (connectedStr != null) {
+            connectedStr = connectedStr.trim();
+            if (connectedStr.length() > 0 && isUpdateURIsSupported()) {
+                List<URI> list = new ArrayList<URI>();
+                StringTokenizer tokenizer = new StringTokenizer(connectedStr, ",");
+                while (tokenizer.hasMoreTokens()) {
+                    String str = tokenizer.nextToken();
+                    try {
+                        URI uri = new URI(str);
+                        list.add(uri);
+                    } catch (Exception e) {
+                        LOG.error("Failed to parse broker address: " + str, e);
+                    }
+                }
+                if (list.isEmpty() == false) {
+                    try {
+                        updateURIs(control.isRebalanceConnection(), list.toArray(new URI[list.size()]));
+                    } catch (IOException e) {
+                        LOG.error("Failed to update transport URI's from: " + connectedStr, e);
+                    }
+                }
 
+            }
+        }
     }
 
     public void start() throws Exception {
@@ -250,13 +291,13 @@
             if (connectedTransport.get() != null) {
                 stateTracker.restore(connectedTransport.get());
             } else {
-                reconnect();
+                reconnect(false);
             }
         }
     }
 
     public void stop() throws Exception {
-        Transport transportToStop=null;
+        Transport transportToStop = null;
         synchronized (reconnectMutex) {
             LOG.debug("Stopped.");
             if (!started) {
@@ -265,7 +306,7 @@
             started = false;
             disposed = true;
             connected = false;
-            for (BackupTransport t:backups) {
+            for (BackupTransport t : backups) {
                 t.setDisposed(true);
             }
             backups.clear();
@@ -279,7 +320,7 @@
             sleepMutex.notifyAll();
         }
         reconnectTask.shutdown();
-        if( transportToStop!=null ) {
+        if (transportToStop != null) {
             transportToStop.stop();
         }
     }
@@ -331,7 +372,7 @@
     public void setMaxReconnectAttempts(int maxReconnectAttempts) {
         this.maxReconnectAttempts = maxReconnectAttempts;
     }
-    
+
     public int getStartupMaxReconnectAttempts() {
         return this.startupMaxReconnectAttempts;
     }
@@ -341,14 +382,14 @@
     }
 
     public long getTimeout() {
-		return timeout;
-	}
+        return timeout;
+    }
 
-	public void setTimeout(long timeout) {
-		this.timeout = timeout;
-	}
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
 
-	/**
+    /**
      * @return Returns the randomize.
      */
     public boolean isRandomize() {
@@ -356,29 +397,30 @@
     }
 
     /**
-     * @param randomize The randomize to set.
+     * @param randomize
+     *            The randomize to set.
      */
     public void setRandomize(boolean randomize) {
         this.randomize = randomize;
     }
-    
+
     public boolean isBackup() {
-		return backup;
-	}
+        return backup;
+    }
 
-	public void setBackup(boolean backup) {
-		this.backup = backup;
-	}
-
-	public int getBackupPoolSize() {
-		return backupPoolSize;
-	}
-
-	public void setBackupPoolSize(int backupPoolSize) {
-		this.backupPoolSize = backupPoolSize;
-	}
-	
-	public boolean isTrackMessages() {
+    public void setBackup(boolean backup) {
+        this.backup = backup;
+    }
+
+    public int getBackupPoolSize() {
+        return backupPoolSize;
+    }
+
+    public void setBackupPoolSize(int backupPoolSize) {
+        this.backupPoolSize = backupPoolSize;
+    }
+
+    public boolean isTrackMessages() {
         return trackMessages;
     }
 
@@ -401,31 +443,32 @@
     public void setMaxCacheSize(int maxCacheSize) {
         this.maxCacheSize = maxCacheSize;
     }
-	
+
     /**
-     * @return Returns true if the command is one sent when a connection
-     * is being closed.
+     * @return Returns true if the command is one sent when a connection is
+     *         being closed.
      */
     private boolean isShutdownCommand(Command command) {
-	return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo));
+        return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo));
     }
-	 
 
     public void oneway(Object o) throws IOException {
-        
-        Command command = (Command)o;
+
+        Command command = (Command) o;
         Exception error = null;
         try {
 
             synchronized (reconnectMutex) {
-            	
+
                 if (isShutdownCommand(command) && connectedTransport.get() == null) {
-                    if(command.isShutdownInfo()) {
-                        // Skipping send of ShutdownInfo command when not connected.
+                    if (command.isShutdownInfo()) {
+                        // Skipping send of ShutdownInfo command when not
+                        // connected.
                         return;
                     }
-                    if(command instanceof RemoveInfo || command.isMessageAck()) {
-                        // Simulate response to RemoveInfo command or ack (as it will be stale)
+                    if (command instanceof RemoveInfo || command.isMessageAck()) {
+                        // Simulate response to RemoveInfo command or ack (as it
+                        // will be stale)
                         stateTracker.track(command);
                         Response response = new Response();
                         response.setCorrelationId(command.getCommandId());
@@ -441,15 +484,14 @@
                         Transport transport = connectedTransport.get();
                         long start = System.currentTimeMillis();
                         boolean timedout = false;
-                        while (transport == null && !disposed
-                                && connectionFailure == null
+                        while (transport == null && !disposed && connectionFailure == null
                                 && !Thread.currentThread().isInterrupted()) {
                             LOG.trace("Waiting for transport to reconnect..: " + command);
                             long end = System.currentTimeMillis();
                             if (timeout > 0 && (end - start > timeout)) {
-                            	timedout = true;
-                            	LOG.info("Failover timed out after " + (end - start) + "ms");
-                            	break;
+                                timedout = true;
+                                LOG.info("Failover timed out after " + (end - start) + "ms");
+                                break;
                             }
                             try {
                                 reconnectMutex.wait(100);
@@ -468,8 +510,8 @@
                             } else if (connectionFailure != null) {
                                 error = connectionFailure;
                             } else if (timedout == true) {
-                            	error = new IOException("Failover timeout of " + timeout + " ms reached.");
-                            }else {
+                                error = new IOException("Failover timeout of " + timeout + " ms reached.");
+                            } else {
                                 error = new IOException("Unexpected failure.");
                             }
                             break;
@@ -480,7 +522,7 @@
                         // then hold it in the requestMap so that we can replay
                         // it later.
                         Tracked tracked = stateTracker.track(command);
-                        synchronized(requestMap) {
+                        synchronized (requestMap) {
                             if (tracked != null && tracked.isWaitingForResponse()) {
                                 requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
                             } else if (tracked == null && command.isResponseRequired()) {
@@ -531,7 +573,7 @@
         if (!disposed) {
             if (error != null) {
                 if (error instanceof IOException) {
-                    throw (IOException)error;
+                    throw (IOException) error;
                 }
                 throw IOExceptionSupport.create(error);
             }
@@ -550,38 +592,53 @@
         throw new AssertionError("Unsupported Method");
     }
 
-    public void add(URI u[]) {
+    public void add(boolean rebalance, URI u[]) {
+        boolean newURI = false;
         for (int i = 0; i < u.length; i++) {
-            if (!uris.contains(u[i])) {
-                uris.add(u[i]);
+            if (contains(u[i])==false) {
+                uris.add(i, u[i]);
+                newURI = true;
             }
         }
-        reconnect();
+        if (newURI) {
+            reconnect(rebalance);
+        }
     }
 
-    public void remove(URI u[]) {
+    public void remove(boolean rebalance, URI u[]) {
         for (int i = 0; i < u.length; i++) {
             uris.remove(u[i]);
         }
-        reconnect();
+        reconnect(rebalance);
     }
 
-    public void add(String u) {
+    public void add(boolean rebalance, String u) {
         try {
-            URI uri = new URI(u);
-            if (!uris.contains(uri)) {
-                uris.add(uri);
+            URI newURI = new URI(u);
+            if (contains(newURI)==false) {
+                uris.add(newURI);
+                reconnect(rebalance);
             }
-
-            reconnect();
+       
         } catch (Exception e) {
             LOG.error("Failed to parse URI: " + u);
         }
     }
 
-    public void reconnect() {
+    public void reconnect(boolean rebalance) {
         synchronized (reconnectMutex) {
             if (started) {
+                if (rebalance) {
+                    Transport transport = this.connectedTransport.getAndSet(null);
+                    if (transport != null) {
+                        try {
+                            transport.stop();
+                        } catch (Exception e) {
+                            LOG.debug("Caught an exception stopping existing transport", e);
+                        }
+                    }
+
+                }
                 LOG.debug("Waking up reconnect task");
                 try {
                     reconnectTask.wakeup();
@@ -603,7 +660,7 @@
         if (randomize) {
             // Randomly, reorder the list by random swapping
             for (int i = 0; i < l.size(); i++) {
-                int p = (int) (Math.random()*100 % l.size());
+                int p = (int) (Math.random() * 100 % l.size());
                 URI t = l.get(p);
                 l.set(p, l.get(i));
                 l.set(i, t);
@@ -621,7 +678,7 @@
     }
 
     public void setTransportListener(TransportListener commandListener) {
-        synchronized(listenerMutex) {
+        synchronized (listenerMutex) {
             this.transportListener = commandListener;
             listenerMutex.notifyAll();
         }
@@ -633,7 +690,7 @@
             return target.cast(this);
         }
         Transport transport = connectedTransport.get();
-        if ( transport != null) {
+        if (transport != null) {
             return transport.narrow(target);
         }
         return null;
@@ -642,13 +699,13 @@
 
     protected void restoreTransport(Transport t) throws Exception, IOException {
         t.start();
-        //send information to the broker - informing it we are an ft client
+        // send information to the broker - informing it we are an ft client
         ConnectionControl cc = new ConnectionControl();
         cc.setFaultTolerant(true);
         t.oneway(cc);
         stateTracker.restore(t);
         Map tmpMap = null;
-        synchronized(requestMap) {
+        synchronized (requestMap) {
             tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
         }
         for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();) {
@@ -668,13 +725,14 @@
         this.useExponentialBackOff = useExponentialBackOff;
     }
 
+    @Override
     public String toString() {
         return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
     }
 
     public String getRemoteAddress() {
         Transport transport = connectedTransport.get();
-        if ( transport != null) {
+        if (transport != null) {
             return transport.getRemoteAddress();
         }
         return null;
@@ -683,8 +741,8 @@
     public boolean isFaultTolerant() {
         return true;
     }
-    
-   final boolean doReconnect() {
+
+    final boolean doReconnect() {
         Exception failure = null;
         synchronized (reconnectMutex) {
 
@@ -702,32 +760,32 @@
                     if (!useExponentialBackOff) {
                         reconnectDelay = initialReconnectDelay;
                     }
-                    synchronized(backupMutex) {
+                    synchronized (backupMutex) {
                         if (backup && !backups.isEmpty()) {
-                        	BackupTransport bt = backups.remove(0);
+                            BackupTransport bt = backups.remove(0);
                             Transport t = bt.getTransport();
                             URI uri = bt.getUri();
                             t.setTransportListener(myTransportListener);
                             try {
-                                if (started) { 
-                                    restoreTransport(t);  
+                                if (started) {
+                                    restoreTransport(t);
                                 }
                                 reconnectDelay = initialReconnectDelay;
-                                failedConnectTransportURI=null;
+                                failedConnectTransportURI = null;
                                 connectedTransportURI = uri;
                                 connectedTransport.set(t);
                                 reconnectMutex.notifyAll();
                                 connectFailures = 0;
                                 LOG.info("Successfully reconnected to backup " + uri);
                                 return false;
-                            }catch (Exception e) {
-                                LOG.debug("Backup transport failed",e);
-                             }
+                            } catch (Exception e) {
+                                LOG.debug("Backup transport failed", e);
+                            }
                         }
                     }
-                    
+
                     Iterator<URI> iter = connectList.iterator();
-                    while(iter.hasNext() && connectedTransport.get() == null && !disposed) {
+                    while (iter.hasNext() && connectedTransport.get() == null && !disposed) {
                         URI uri = iter.next();
                         Transport t = null;
                         try {
@@ -735,7 +793,7 @@
                             t = TransportFactory.compositeConnect(uri);
                             t.setTransportListener(myTransportListener);
                             t.start();
-                            
+
                             if (started) {
                                 restoreTransport(t);
                             }
@@ -746,36 +804,38 @@
                             connectedTransport.set(t);
                             reconnectMutex.notifyAll();
                             connectFailures = 0;
-                         // Make sure on initial startup, that the transportListener 
-                         // has been initialized for this instance.
-                            synchronized(listenerMutex) {
-                                if (transportListener==null) {
+                            // Make sure on initial startup, that the
+                            // transportListener
+                            // has been initialized for this instance.
+                            synchronized (listenerMutex) {
+                                if (transportListener == null) {
                                     try {
-                                        //if it isn't set after 2secs - it
-                                        //probably never will be
+                                        // if it isn't set after 2secs - it
+                                        // probably never will be
                                         listenerMutex.wait(2000);
-                                    }catch(InterruptedException ex) {}
+                                    } catch (InterruptedException ex) {
+                                    }
                                 }
                             }
                             if (transportListener != null) {
                                 transportListener.transportResumed();
-                            }else {
+                            } else {
                                 LOG.debug("transport resumed by transport listener not set");
                             }
                             if (firstConnection) {
-                                firstConnection=false;
+                                firstConnection = false;
                                 LOG.info("Successfully connected to " + uri);
-                            }else {
+                            } else {
                                 LOG.info("Successfully reconnected to " + uri);
                             }
-                            connected=true;
+                            connected = true;
                             return false;
                         } catch (Exception e) {
                             failure = e;
                             LOG.debug("Connect fail to: " + uri + ", reason: " + e);
-                            if (t!=null) {
+                            if (t != null) {
                                 try {
-                                    t.stop();       
+                                    t.stop();
                                 } catch (Exception ee) {
                                     LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee);
                                 }
@@ -790,31 +850,32 @@
                     reconnectAttempts = this.startupMaxReconnectAttempts;
                 }
             }
-            if (reconnectAttempts==0) {
+            if (reconnectAttempts == 0) {
                 reconnectAttempts = this.maxReconnectAttempts;
-            }            
+            }
             if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) {
                 LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
                 connectionFailure = failure;
- 	
-                // Make sure on initial startup, that the transportListener has been initialized
+
+                // Make sure on initial startup, that the transportListener has
+                // been initialized
                 // for this instance.
-                synchronized(listenerMutex) {
-                    if (transportListener==null) {
+                synchronized (listenerMutex) {
+                    if (transportListener == null) {
                         try {
                             listenerMutex.wait(2000);
-                        }catch(InterruptedException ex) {}
+                        } catch (InterruptedException ex) {
+                        }
                     }
                 }
 
-          
-                if(transportListener != null) {
+                if (transportListener != null) {
                     if (connectionFailure instanceof IOException) {
-                    	transportListener.onException((IOException)connectionFailure);
+                        transportListener.onException((IOException) connectionFailure);
                     } else {
-                    	transportListener.onException(IOExceptionSupport.create(connectionFailure));
+                        transportListener.onException(IOExceptionSupport.create(connectionFailure));
                     }
-                }        
+                }
                 reconnectMutex.notifyAll();
                 return false;
             }
@@ -841,59 +902,92 @@
         return !disposed;
     }
 
-   
-   final boolean buildBackups() {
-	   synchronized (backupMutex) {
-		   if (!disposed && backup && backups.size() < backupPoolSize) {
-			   List<URI> connectList = getConnectList();
-			   //removed disposed backups
-			   List<BackupTransport>disposedList = new ArrayList<BackupTransport>();
-			   for (BackupTransport bt:backups) {
-				   if (bt.isDisposed()) {
-					   disposedList.add(bt);
-				   }
-			   }
-			   backups.removeAll(disposedList);
-			   disposedList.clear();
-			   for (Iterator<URI>iter = connectList.iterator();iter.hasNext() && backups.size() < backupPoolSize;) {
-				   URI uri = iter.next();
-				   if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
-					   try {
-						   BackupTransport bt = new BackupTransport(this);
-						   bt.setUri(uri);
-						   if (!backups.contains(bt)) {
-							   Transport t = TransportFactory.compositeConnect(uri);
-		                       t.setTransportListener(bt);
-		                       t.start();
-		                       bt.setTransport(t);
-		                       backups.add(bt);
-						   }
-					   } catch(Exception e) {
-						   LOG.debug("Failed to build backup ",e);
-					   }
-				   }
-			   }
-		   }
-	   }
-	   return false;
-   }
+    final boolean buildBackups() {
+        synchronized (backupMutex) {
+            if (!disposed && backup && backups.size() < backupPoolSize) {
+                List<URI> connectList = getConnectList();
+                // removed disposed backups
+                List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
+                for (BackupTransport bt : backups) {
+                    if (bt.isDisposed()) {
+                        disposedList.add(bt);
+                    }
+                }
+                backups.removeAll(disposedList);
+                disposedList.clear();
+                for (Iterator<URI> iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize;) {
+                    URI uri = iter.next();
+                    if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
+                        try {
+                            BackupTransport bt = new BackupTransport(this);
+                            bt.setUri(uri);
+                            if (!backups.contains(bt)) {
+                                Transport t = TransportFactory.compositeConnect(uri);
+                                t.setTransportListener(bt);
+                                t.start();
+                                bt.setTransport(t);
+                                backups.add(bt);
+                            }
+                        } catch (Exception e) {
+                            LOG.debug("Failed to build backup ", e);
+                        }
+                    }
+                }
+            }
+        }
+        return false;
+    }
 
     public boolean isDisposed() {
-    	return disposed;
+        return disposed;
     }
-    
-    
+
     public boolean isConnected() {
         return connected;
     }
-    
+
     public void reconnect(URI uri) throws IOException {
-    	add(new URI[] {uri});
+        add(true, new URI[] { uri });
+    }
+
+    public boolean isReconnectSupported() {
+        return true;
+    }
+
+    public boolean isUpdateURIsSupported() {
+        return true;
+    }
+
+    public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
+        List<URI> copy = new ArrayList<URI>(this.updated);
+        List<URI> add = new ArrayList<URI>();
+        if (updatedURIs != null && updatedURIs.length > 0) {
+            Set<URI> set = new HashSet<URI>();
+            for (int i = 0; i < updatedURIs.length; i++) {
+                URI uri = updatedURIs[i];
+                if (uri != null) {
+                    set.add(uri);
+                }
+            }
+            for (URI uri : set) {
+                if (copy.remove(uri) == false) {
+                    add.add(uri);
+                }
+            }
+            synchronized (reconnectMutex) {
+                this.updated.clear();
+                this.updated.addAll(add);
+                for (URI uri : copy) {
+                    this.uris.remove(uri);
+                }
+                add(rebalance, add.toArray(new URI[add.size()]));
+            }
+        }
     }
 
     public int getReceiveCounter() {
         Transport transport = connectedTransport.get();
-        if( transport == null ) {
+        if (transport == null) {
             return 0;
         }
         return transport.getReceiveCounter();
@@ -904,4 +998,25 @@
             stateTracker.connectionInterruptProcessingComplete(this, connectionId);
         }
     }
+    
+    private boolean contains(URI newURI) {
+       
+        boolean result = false;
+        try {
+        for (URI uri:uris) {
+            if (newURI.getPort()==uri.getPort()) {
+                InetAddress newAddr = InetAddress.getByName(newURI.getHost());
+                InetAddress addr = InetAddress.getByName(uri.getHost());
+                if (addr.equals(newAddr)) {
+                    result = true;
+                    break;
+                }
+            }
+        }
+        }catch(IOException e) {
+            result = true;
+            LOG.error("Failed to verify URI " + newURI + " already known: " + e);
+        }
+        return result;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java Mon Mar  8 12:48:45 2010
@@ -20,7 +20,6 @@
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Map;
-
 import org.apache.activemq.transport.MutexTransport;
 import org.apache.activemq.transport.ResponseCorrelator;
 import org.apache.activemq.transport.Transport;
@@ -32,6 +31,7 @@
 
 public class FailoverTransportFactory extends TransportFactory {
 
+    @Override
     public Transport doConnect(URI location) throws IOException {
         try {
             Transport transport = createTransport(URISupport.parseComposite(location));
@@ -43,6 +43,7 @@
         }
     }
 
+    @Override
     public Transport doCompositeConnect(URI location) throws IOException {
         try {
             return createTransport(URISupport.parseComposite(location));
@@ -62,7 +63,7 @@
         if (!options.isEmpty()) {
             throw new IllegalArgumentException("Invalid connect parameters: " + options);
         }
-        transport.add(compositData.getComponents());
+        transport.add(false,compositData.getComponents());
         return transport;
     }
 
@@ -72,6 +73,7 @@
         return transport;
     }
 
+    @Override
     public TransportServer doBind(URI location) throws IOException {
         throw new IOException("Invalid server URI: " + location);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Mon Mar  8 12:48:45 2010
@@ -23,7 +23,6 @@
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
@@ -65,7 +64,7 @@
     private final TaskRunner reconnectTask;
     private boolean started;
 
-    private ArrayList<FanoutTransportHandler> transports = new ArrayList<FanoutTransportHandler>();
+    private final ArrayList<FanoutTransportHandler> transports = new ArrayList<FanoutTransportHandler>();
     private int connectedCount;
 
     private int minAckCount = 2;
@@ -73,7 +72,7 @@
     private long initialReconnectDelay = 10;
     private long maxReconnectDelay = 1000 * 30;
     private long backOffMultiplier = 2;
-    private boolean useExponentialBackOff = true;
+    private final boolean useExponentialBackOff = true;
     private int maxReconnectAttempts;
     private Exception connectionFailure;
     private FanoutTransportHandler primary;
@@ -89,6 +88,7 @@
             this.ackCount = new AtomicInteger(count);
         }
 
+        @Override
         public String toString() {
             return command.getCommandId() + "=" + ackCount.get();
         }
@@ -107,6 +107,7 @@
             this.uri = uri;
         }
 
+        @Override
         public void onCommand(Object o) {
             Command command = (Command)o;
             if (command.isResponse()) {
@@ -125,6 +126,7 @@
             }
         }
 
+        @Override
         public void onException(IOException error) {
             try {
                 synchronized (reconnectMutex) {
@@ -499,7 +501,7 @@
         }
     }
 
-    public void add(URI uris[]) {
+    public void add(boolean reblance,URI uris[]) {
 
         synchronized (reconnectMutex) {
             for (int i = 0; i < uris.length; i++) {
@@ -523,7 +525,7 @@
 
     }
 
-    public void remove(URI uris[]) {
+    public void remove(boolean rebalance,URI uris[]) {
 
         synchronized (reconnectMutex) {
             for (int i = 0; i < uris.length; i++) {
@@ -546,9 +548,20 @@
     }
     
     public void reconnect(URI uri) throws IOException {
-		add(new URI[]{uri});
+		add(true,new URI[]{uri});
 		
 	}
+    
+    public boolean isReconnectSupported() {
+        return true;
+    }
+
+    public boolean isUpdateURIsSupported() {
+        return true;
+    }
+    public void updateURIs(boolean reblance,URI[] uris) throws IOException {
+        add(reblance,uris);
+    }
 
 
     public String getRemoteAddress() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java Mon Mar  8 12:48:45 2010
@@ -18,7 +18,6 @@
 
 import java.io.IOException;
 import java.net.URI;
-
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
@@ -70,6 +69,7 @@
         getNext().stop();
     }
 
+    @Override
     public void onCommand(Object command) {
         getTransportListener().onCommand(command);
     }
@@ -88,6 +88,7 @@
         return transportListener;
     }
 
+    @Override
     public String toString() {
         return getNext().toString();
     }
@@ -108,6 +109,7 @@
         return getNext().request(command, timeout);
     }
 
+    @Override
     public void onException(IOException error) {
         getTransportListener().onException(error);
     }
@@ -155,4 +157,16 @@
     public int getReceiveCounter() {
         return getNext().getReceiveCounter();
     }
+    
+
+    public boolean isReconnectSupported() {
+        return getNext().isReconnectSupported();
+    }
+
+    public boolean isUpdateURIsSupported() {
+        return getNext().isUpdateURIsSupported();
+    }
+    public void updateURIs(boolean reblance,URI[] uris) throws IOException {
+       getNext().updateURIs(reblance,uris);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java Mon Mar  8 12:48:45 2010
@@ -133,6 +133,10 @@
     public boolean isShutdownInfo() {
         return false;
     }
+    
+    public boolean isConnectionControl() {
+        return false;
+    }
 
     public boolean isWireFormatInfo() {
         return false;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Mon Mar  8 12:48:45 2010
@@ -35,9 +35,7 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-
 import javax.net.SocketFactory;
-
 import org.apache.activemq.Service;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportLoggerFactory;
@@ -171,6 +169,7 @@
     /**
      * @return pretty print of 'this'
      */
+    @Override
     public String toString() {
         return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
     }
@@ -398,6 +397,7 @@
         }
     }
 
+    @Override
     protected void doStart() throws Exception {
         connect();
         stoppedLatch.set(new CountDownLatch(1));
@@ -454,6 +454,7 @@
         initializeStreams();
     }
 
+    @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Stopping transport " + this);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Mon Mar  8 12:48:45 2010
@@ -21,7 +21,6 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -275,6 +274,7 @@
         this.network = network;
     }
 
+    @Override
     public String toString() {
         return location + "#" + id;
     }
@@ -342,8 +342,19 @@
 	}
 
 	public void reconnect(URI uri) throws IOException {
-		throw new IOException("Not supported");
-	}
+        throw new IOException("Not supported");
+    }
+
+    public boolean isReconnectSupported() {
+        return false;
+    }
+
+    public boolean isUpdateURIsSupported() {
+        return false;
+    }
+    public void updateURIs(boolean reblance,URI[] uris) throws IOException {
+        throw new IOException("Not supported");
+    }
 
     public int getReceiveCounter() {
         return receiveCounter;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionControlTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionControlTest.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionControlTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionControlTest.java Mon Mar  8 12:48:45 2010
@@ -56,5 +56,8 @@
         info.setFaultTolerant(true);
         info.setResume(false);
         info.setSuspend(true);
+        info.setConnectedBrokers("ConnectedBrokers:1");
+        info.setReconnectTo("ReconnectTo:2");
+        info.setRebalanceConnection(false);
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java Mon Mar  8 12:48:45 2010
@@ -65,5 +65,6 @@
         info.setBrokerMasterConnector(true);
         info.setManageable(false);
         info.setClientMaster(true);
+        info.setFaultTolerant(false);
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java Mon Mar  8 12:48:45 2010
@@ -17,6 +17,7 @@
 package org.apache.activemq.perf;
 
 import java.io.File;
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.store.kahadb.KahaDBStore;
 
@@ -30,6 +31,13 @@
         this.initialConsumerDelay = 10 * 1000;
         super.setUp();
     }
+    
+    @Override
+    protected ActiveMQConnectionFactory createConnectionFactory(String uri) throws Exception {
+        ActiveMQConnectionFactory result = new ActiveMQConnectionFactory(uri);
+        //result.setDispatchAsync(false);
+        return result;
+    }
 
     @Override
     protected void configureBroker(BrokerService answer, String uri) throws Exception {
@@ -52,7 +60,7 @@
 
         // small batch means more frequent and smaller writes
         kaha.setIndexWriteBatchSize(100);
-        kaha.setIndexCacheSize(10000);
+        kaha.setIndexCacheSize(1000);
         // do the index write in a separate thread
         //kaha.setEnableIndexWriteAsync(true);
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java Mon Mar  8 12:48:45 2010
@@ -27,7 +27,7 @@
 
     @Override
     protected void setUp() throws Exception {
-        this.initialConsumerDelay = 10 * 1000;
+       // this.initialConsumerDelay = 10 * 1000;
         super.setUp();
     }
     @Override
@@ -43,7 +43,7 @@
         // The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified 
         // what happens if the index is updated but a journal update is lost.
         // Index is going to be in consistent, but can it be repaired?
-        //kaha.setEnableJournalDiskSyncs(false);
+        kaha.setEnableJournalDiskSyncs(false);
         // Using a bigger journal file size makes he take fewer spikes as it is not switching files as often.
         //kaha.setJournalMaxFileLength(1024*1024*100);
         
@@ -51,6 +51,7 @@
         kaha.setIndexWriteBatchSize(100);
         // do the index write in a separate thread
         kaha.setEnableIndexWriteAsync(true);
+        kaha.setIndexCacheSize(10000);
         
         answer.setPersistenceAdapter(kaha);
         answer.addConnector(uri);

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RunBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RunBroker.java?rev=920306&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RunBroker.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RunBroker.java Mon Mar  8 12:48:45 2010
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.perf;
+
+import java.io.File;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+
+public class RunBroker {
+
+    public static void main(String arg[]) {
+
+        try {
+        KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter();
+            File dataFileDir = new File("target/test-amq-data/perfTest/kahadb");
+            File archiveDir = new File(dataFileDir,"archive");
+            KahaDBStore kahaDB = new KahaDBStore();
+            kahaDB.setDirectory(dataFileDir);
+            kahaDB.setDirectoryArchive(archiveDir);
+            kahaDB.setArchiveDataLogs(true);
+
+            // The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified 
+            // what happens if the index is updated but a journal update is lost.
+            // Index is going to be in consistent, but can it be repaired?
+            //kaha.setEnableJournalDiskSyncs(false);
+            // Using a bigger journal file size makes he take fewer spikes as it is not switching files as often.
+            //kaha.setJournalMaxFileLength(1024*1024*100);
+            
+            // small batch means more frequent and smaller writes
+            kahaDB.setIndexWriteBatchSize(1000);
+            kahaDB.setIndexCacheSize(10000);
+            // do the index write in a separate thread
+            kahaDB.setEnableIndexWriteAsync(true);
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(false);
+        //broker.setPersistenceAdapter(adaptor);
+        //broker.setPersistenceAdapter(kahaDB);
+        broker.setPersistent(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.addConnector("tcp://0.0.0.0:61616");
+        broker.start();
+        System.err.println("Running");
+        Thread.sleep(Long.MAX_VALUE);
+        }catch(Throwable e) {
+            e.printStackTrace();
+        }
+
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RunBroker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RunBroker.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java?rev=920306&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java Mon Mar  8 12:48:45 2010
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.network.NetworkConnector;
+
+
+public class FailoverClusterTest extends TestCase {
+
+private static final int NUMBER = 10;
+private static final String BROKER_A_BIND_ADDRESS = "tcp://0.0.0.0:61616";
+private static final String BROKER_B_BIND_ADDRESS = "tcp://0.0.0.0:61617";
+private static final String CLIENT_URL = "failover://("+BROKER_A_BIND_ADDRESS+")";
+private static final String BROKER_A_NAME = "BROKERA";
+private static final String BROKER_B_NAME = "BROKERB";
+private BrokerService brokerA;
+private BrokerService brokerB;
+private final List<ActiveMQConnection>connections = new ArrayList<ActiveMQConnection>();
+
+
+  public void testClusterConnectedAfterClients() throws Exception{
+      createClients();
+      if (brokerB == null) {
+          brokerB = createBrokerB(BROKER_B_BIND_ADDRESS);
+      }
+      Thread.sleep(3000);
+      Set<String> set = new HashSet<String>();
+      for (ActiveMQConnection c:connections) {
+          set.add(c.getTransportChannel().getRemoteAddress());
+      }
+      assertTrue(set.size() > 1);
+  }
+  
+  public void testClusterConnectedBeforeClients() throws Exception{
+      
+      if (brokerB == null) {
+          brokerB = createBrokerB(BROKER_B_BIND_ADDRESS);
+      }
+      Thread.sleep(5000);
+      createClients();
+      Thread.sleep(2000);
+      brokerA.stop();
+      Thread.sleep(2000);
+     
+      URI brokerBURI = new URI(BROKER_B_BIND_ADDRESS);
+      for (ActiveMQConnection c:connections) {
+          String addr = c.getTransportChannel().getRemoteAddress();    
+          assertTrue(addr.indexOf(""+brokerBURI.getPort()) > 0);
+      }
+  }
+
+    @Override
+    protected void setUp() throws Exception {
+        if (brokerA == null) {
+           brokerA = createBrokerA(BROKER_A_BIND_ADDRESS);
+        }
+        
+        
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        for (Connection c:connections) {
+            c.close();
+        }     
+        if (brokerB != null) {
+            brokerB.stop();
+            brokerB = null;
+        }
+        if (brokerA != null) {
+            brokerA.stop();
+            brokerA = null;
+        }
+    }
+    
+    protected BrokerService createBrokerA(String uri) throws Exception {
+        BrokerService answer = new BrokerService();
+        configureConsumerBroker(answer,uri);
+        answer.start();
+        return answer;
+    }
+    
+    protected void configureConsumerBroker(BrokerService answer,String uri) throws Exception {
+        answer.setBrokerName(BROKER_A_NAME);
+        answer.setPersistent(false);
+        TransportConnector connector = answer.addConnector(uri);
+        connector.setRebalanceClusterClients(true);
+        connector.setUpdateClusterClients(true);
+        answer.setUseShutdownHook(false);
+    }
+    
+    protected BrokerService createBrokerB(String uri) throws Exception {
+        BrokerService answer = new BrokerService();
+        configureNetwork(answer,uri);
+        answer.start();
+        return answer;
+    }
+    
+    protected void configureNetwork(BrokerService answer,String uri) throws Exception {
+        answer.setBrokerName(BROKER_B_NAME);
+        answer.setPersistent(false);
+        NetworkConnector network = answer.addNetworkConnector("static://"+BROKER_A_BIND_ADDRESS);
+        network.setDuplex(true);
+        TransportConnector connector =answer.addConnector(uri);
+        connector.setRebalanceClusterClients(true);
+        connector.setUpdateClusterClients(true);
+        answer.setUseShutdownHook(false);
+    }
+    
+    protected void createClients() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(CLIENT_URL);
+        for (int i =0;i < NUMBER; i++) {
+            ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
+            c.start();
+            Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue  = s.createQueue(getClass().getName());
+            MessageConsumer consumer = s.createConsumer(queue);
+            connections.add(c);
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message