activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r563982 [23/32] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jm...
Date Wed, 08 Aug 2007 18:58:13 GMT
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java Wed Aug  8 11:56:59 2007
@@ -35,17 +35,17 @@
     public Transport createTransport(CompositeData compositData) throws IOException {
         Map parameters = new HashMap(compositData.getParameters());
         DiscoveryTransport transport = new DiscoveryTransport(createTransport(parameters));
-        
+
         DiscoveryAgent discoveryAgent = DiscoveryAgentFactory.createDiscoveryAgent(compositData.getComponents()[0]);
         transport.setDiscoveryAgent(discoveryAgent);
-        IntrospectionSupport.setProperties(transport,parameters);
+        IntrospectionSupport.setProperties(transport, parameters);
 
         return transport;
     }
 
-    public TransportServer doBind(String brokerId,URI location) throws IOException{
-        throw new IOException("Invalid server URI: "+location);
-//        try{
+    public TransportServer doBind(String brokerId, URI location) throws IOException {
+        throw new IOException("Invalid server URI: " + location);
+// try{
 //            CompositeData compositData=URISupport.parseComposite(location);
 //            URI[] components=compositData.getComponents();
 //            if(components.length!=1){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/JmDNSFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/JmDNSFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/JmDNSFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/JmDNSFactory.java Wed Aug  8 11:56:59 2007
@@ -24,34 +24,35 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class JmDNSFactory {
-    
+
     static Map registry = new HashMap();
+
     static class UsageTracker {
         AtomicInteger count = new AtomicInteger(0);
-        JmDNS jmDNS; 
+        JmDNS jmDNS;
     }
-    
+
     static synchronized JmDNS create(final InetAddress address) throws IOException {
         UsageTracker tracker = (UsageTracker)registry.get(address);
-        if( tracker == null ) {
+        if (tracker == null) {
             tracker = new UsageTracker();
             tracker.jmDNS = new JmDNS(address) {
                 public void close() {
-                    if( onClose(address) ) {
+                    if (onClose(address)) {
                         super.close();
                     }
                 }
             };
             registry.put(address, tracker);
-        } 
+        }
         tracker.count.incrementAndGet();
         return tracker.jmDNS;
     }
-    
-    static synchronized boolean onClose(InetAddress address){
-        UsageTracker tracker=(UsageTracker) registry.get(address);
-        if(tracker!=null){
-            if(tracker.count.decrementAndGet()==0){
+
+    static synchronized boolean onClose(InetAddress address) {
+        UsageTracker tracker = (UsageTracker)registry.get(address);
+        if (tracker != null) {
+            if (tracker.count.decrementAndGet() == 0) {
                 registry.remove(address);
                 return true;
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java Wed Aug  8 11:56:59 2007
@@ -41,7 +41,7 @@
 /**
  * A {@link DiscoveryAgent} using <a href="http://www.zeroconf.org/">Zeroconf</a>
  * via the <a href="http://jmdns.sf.net/">jmDNS</a> library
- *
+ * 
  * @version $Revision$
  */
 public class RendezvousDiscoveryAgent implements DiscoveryAgent, ServiceListener {
@@ -60,7 +60,7 @@
     private final CopyOnWriteArrayList serviceInfos = new CopyOnWriteArrayList();
 
     // DiscoveryAgent interface
-    //-------------------------------------------------------------------------
+    // -------------------------------------------------------------------------
     public void start() throws Exception {
         if (group == null) {
             throw new IOException("You must specify a group to discover");
@@ -73,23 +73,22 @@
         try {
             // force lazy construction
             getJmdns();
-            if (listener!=null) {
-                log.info("Discovering service of type: " +type);
+            if (listener != null) {
+                log.info("Discovering service of type: " + type);
                 jmdns.addServiceListener(type, this);
             }
-        }
-        catch (IOException e) {
+        } catch (IOException e) {
             JMSExceptionSupport.create("Failed to start JmDNS service: " + e, e);
         }
     }
 
     public void stop() {
-        if( jmdns!=null ) {
+        if (jmdns != null) {
             for (Iterator iter = serviceInfos.iterator(); iter.hasNext();) {
-                ServiceInfo si = (ServiceInfo) iter.next();
+                ServiceInfo si = (ServiceInfo)iter.next();
                 jmdns.unregisterService(si);
             }
-            
+
             // Close it down async since this could block for a while.
             final JmDNS closeTarget = jmdns;
             Thread thread = new Thread() {
@@ -97,28 +96,27 @@
                     closeTarget.close();
                 }
             };
-            
+
             thread.setDaemon(true);
             thread.start();
-            
-            jmdns=null;
+
+            jmdns = null;
         }
     }
-    
+
     public void registerService(String name) throws IOException {
         ServiceInfo si = createServiceInfo(name, new HashMap());
         serviceInfos.add(si);
         getJmdns().registerService(si);
     }
 
-
     // ServiceListener interface
-    //-------------------------------------------------------------------------
+    // -------------------------------------------------------------------------
     public void addService(JmDNS jmDNS, String type, String name) {
         if (log.isDebugEnabled()) {
             log.debug("addService with type: " + type + " name: " + name);
         }
-        if( listener!=null ) 
+        if (listener != null)
             listener.onServiceAdd(new DiscoveryEvent(name));
         jmDNS.requestServiceInfo(type, name);
     }
@@ -127,18 +125,21 @@
         if (log.isDebugEnabled()) {
             log.debug("removeService with type: " + type + " name: " + name);
         }
-        if( listener!=null )
+        if (listener != null)
             listener.onServiceRemove(new DiscoveryEvent(name));
     }
 
-	public void serviceAdded(ServiceEvent event) {
-		addService(event.getDNS(), event.getType(), event.getName());
-	}
-	public void serviceRemoved(ServiceEvent event) {
-		removeService(event.getDNS(), event.getType(), event.getName());
-	}
+    public void serviceAdded(ServiceEvent event) {
+        addService(event.getDNS(), event.getType(), event.getName());
+    }
+
+    public void serviceRemoved(ServiceEvent event) {
+        removeService(event.getDNS(), event.getType(), event.getName());
+    }
+
     public void serviceResolved(ServiceEvent event) {
     }
+
     public void resolveService(JmDNS jmDNS, String type, String name, ServiceInfo serviceInfo) {
     }
 
@@ -169,7 +170,6 @@
         this.jmdns = jmdns;
     }
 
-
     public InetAddress getLocalAddress() throws UnknownHostException {
         if (localAddress == null) {
             localAddress = createLocalAddress();
@@ -190,7 +190,7 @@
     }
 
     // Implementation methods
-    //-------------------------------------------------------------------------
+    // -------------------------------------------------------------------------
     protected ServiceInfo createServiceInfo(String name, Map map) {
         int port = MapHelper.getInt(map, "port", 0);
 
@@ -199,7 +199,7 @@
         if (log.isDebugEnabled()) {
             log.debug("Registering service type: " + type + " name: " + name + " details: " + map);
         }
-        return new ServiceInfo(type, name+"."+type, port, weight, priority, "");
+        return new ServiceInfo(type, name + "." + type, port, weight, priority, "");
     }
 
     protected JmDNS createJmDNS() throws IOException {
@@ -222,11 +222,11 @@
     }
 
     public void setGroup(String group) {
-        this.group=group;
+        this.group = group;
     }
 
     public String getType() {
-        return "_" + group+"."+TYPE_SUFFIX;
+        return "_" + group + "." + TYPE_SUFFIX;
     }
 
     public void serviceFailed(DiscoveryEvent event) throws IOException {
@@ -237,8 +237,8 @@
      * @param brokerName
      * @see org.apache.activemq.transport.discovery.DiscoveryAgent#setBrokerName(java.lang.String)
      */
-    public void setBrokerName(String brokerName){
+    public void setBrokerName(String brokerName) {
         // implementation of interface
-        
+
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java Wed Aug  8 11:56:59 2007
@@ -26,12 +26,13 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * A simple DiscoveryAgent that allows static configuration of the discovered services.
+ * A simple DiscoveryAgent that allows static configuration of the discovered
+ * services.
  * 
  * @version $Revision$
  */
 public class SimpleDiscoveryAgent implements DiscoveryAgent {
-    
+
     private long initialReconnectDelay = 1000;
     private long maxReconnectDelay = 1000 * 30;
     private long backOffMultiplier = 2;
@@ -44,41 +45,41 @@
     String services[] = new String[] {};
     String group = "DEFAULT";
     private final AtomicBoolean running = new AtomicBoolean(false);
-    
+
     class SimpleDiscoveryEvent extends DiscoveryEvent {
-		
-    	private int connectFailures;
+
+        private int connectFailures;
         private long reconnectDelay = initialReconnectDelay;
         private long connectTime = System.currentTimeMillis();
         private AtomicBoolean failed = new AtomicBoolean(false);
 
         public SimpleDiscoveryEvent(String service) {
-			super(service);
-		}
-        
+            super(service);
+        }
+
     }
-    
+
     public void setDiscoveryListener(DiscoveryListener listener) {
         this.listener = listener;
     }
-    
+
     public void registerService(String name) throws IOException {
     }
-    
+
     public void start() throws Exception {
-    	running.set(true);
+        running.set(true);
         for (int i = 0; i < services.length; i++) {
             listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
         }
     }
-    
+
     public void stop() throws Exception {
-    	running.set(false);
-    	synchronized(sleepMutex) {
-    		sleepMutex.notifyAll();
-    	}
+        running.set(false);
+        synchronized (sleepMutex) {
+            sleepMutex.notifyAll();
+        }
     }
-  
+
     public String[] getServices() {
         return services;
     }
@@ -86,11 +87,11 @@
     public void setServices(String services) {
         this.services = services.split(",");
     }
-    
+
     public void setServices(String services[]) {
         this.services = services;
     }
-    
+
     public void setServices(URI services[]) {
         this.services = new String[services.length];
         for (int i = 0; i < services.length; i++) {
@@ -110,112 +111,112 @@
     }
 
     public void serviceFailed(DiscoveryEvent devent) throws IOException {
-    	
-        final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent) devent;
-        if( event.failed.compareAndSet(false, true) ) {
-        	
-			listener.onServiceRemove(event);
-	    	Thread thread = new Thread() {
-	    		public void run() {
-	
-	
-	    			// We detect a failed connection attempt because the service fails right
-	    			// away.
-	    			if( event.connectTime + minConnectTime > System.currentTimeMillis()  ) {
-	    				
-	    				event.connectFailures++;
-	    				
-	    				if( maxReconnectAttempts>0 &&  event.connectFailures >= maxReconnectAttempts ) {
-	    					// Don' try to re-connect
-	    					return;
-	    				}
-	    				
-		                synchronized(sleepMutex){
-		                    try{
-		                    	if( !running.get() )
-		                    		return;
-		                    	
-		                        sleepMutex.wait(event.reconnectDelay);
-		                    }catch(InterruptedException ie){
+
+        final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
+        if (event.failed.compareAndSet(false, true)) {
+
+            listener.onServiceRemove(event);
+            Thread thread = new Thread() {
+                public void run() {
+
+                    // We detect a failed connection attempt because the service
+                    // fails right
+                    // away.
+                    if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
+
+                        event.connectFailures++;
+
+                        if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
+                            // Don' try to re-connect
+                            return;
+                        }
+
+                        synchronized (sleepMutex) {
+                            try {
+                                if (!running.get())
+                                    return;
+
+                                sleepMutex.wait(event.reconnectDelay);
+                            } catch (InterruptedException ie) {
                                 Thread.currentThread().interrupt();
-		                       return;
-		                    }
-		                }
-	
-		                if (!useExponentialBackOff) {
-		                    event.reconnectDelay = initialReconnectDelay;
-		                } else {
-		                    // Exponential increment of reconnect delay.
-		                    event.reconnectDelay*=backOffMultiplier;
-		                    if(event.reconnectDelay>maxReconnectDelay)
-		                        event.reconnectDelay=maxReconnectDelay;
-		                }
-		                
-	    			} else {
-	    				event.connectFailures = 0;
-	                    event.reconnectDelay = initialReconnectDelay;
-	    			}
-	    			                    			
-	            	if( !running.get() )
-	            		return;
-	            	
-	    			event.connectTime = System.currentTimeMillis();
-	    			event.failed.set(false);
-	    			
-	    			listener.onServiceAdd(event);
-	    		}
-	    	};
-	    	thread.setDaemon(true);
-	    	thread.start();
+                                return;
+                            }
+                        }
+
+                        if (!useExponentialBackOff) {
+                            event.reconnectDelay = initialReconnectDelay;
+                        } else {
+                            // Exponential increment of reconnect delay.
+                            event.reconnectDelay *= backOffMultiplier;
+                            if (event.reconnectDelay > maxReconnectDelay)
+                                event.reconnectDelay = maxReconnectDelay;
+                        }
+
+                    } else {
+                        event.connectFailures = 0;
+                        event.reconnectDelay = initialReconnectDelay;
+                    }
+
+                    if (!running.get())
+                        return;
+
+                    event.connectTime = System.currentTimeMillis();
+                    event.failed.set(false);
+
+                    listener.onServiceAdd(event);
+                }
+            };
+            thread.setDaemon(true);
+            thread.start();
         }
     }
 
-	public long getBackOffMultiplier() {
-		return backOffMultiplier;
-	}
-
-	public void setBackOffMultiplier(long backOffMultiplier) {
-		this.backOffMultiplier = backOffMultiplier;
-	}
-
-	public long getInitialReconnectDelay() {
-		return initialReconnectDelay;
-	}
-
-	public void setInitialReconnectDelay(long initialReconnectDelay) {
-		this.initialReconnectDelay = initialReconnectDelay;
-	}
-
-	public int getMaxReconnectAttempts() {
-		return maxReconnectAttempts;
-	}
-
-	public void setMaxReconnectAttempts(int maxReconnectAttempts) {
-		this.maxReconnectAttempts = maxReconnectAttempts;
-	}
-
-	public long getMaxReconnectDelay() {
-		return maxReconnectDelay;
-	}
-
-	public void setMaxReconnectDelay(long maxReconnectDelay) {
-		this.maxReconnectDelay = maxReconnectDelay;
-	}
-
-	public long getMinConnectTime() {
-		return minConnectTime;
-	}
-
-	public void setMinConnectTime(long minConnectTime) {
-		this.minConnectTime = minConnectTime;
-	}
-
-	public boolean isUseExponentialBackOff() {
-		return useExponentialBackOff;
-	}
-
-	public void setUseExponentialBackOff(boolean useExponentialBackOff) {
-		this.useExponentialBackOff = useExponentialBackOff;
-	}
-    
+    public long getBackOffMultiplier() {
+        return backOffMultiplier;
+    }
+
+    public void setBackOffMultiplier(long backOffMultiplier) {
+        this.backOffMultiplier = backOffMultiplier;
+    }
+
+    public long getInitialReconnectDelay() {
+        return initialReconnectDelay;
+    }
+
+    public void setInitialReconnectDelay(long initialReconnectDelay) {
+        this.initialReconnectDelay = initialReconnectDelay;
+    }
+
+    public int getMaxReconnectAttempts() {
+        return maxReconnectAttempts;
+    }
+
+    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
+        this.maxReconnectAttempts = maxReconnectAttempts;
+    }
+
+    public long getMaxReconnectDelay() {
+        return maxReconnectDelay;
+    }
+
+    public void setMaxReconnectDelay(long maxReconnectDelay) {
+        this.maxReconnectDelay = maxReconnectDelay;
+    }
+
+    public long getMinConnectTime() {
+        return minConnectTime;
+    }
+
+    public void setMinConnectTime(long minConnectTime) {
+        this.minConnectTime = minConnectTime;
+    }
+
+    public boolean isUseExponentialBackOff() {
+        return useExponentialBackOff;
+    }
+
+    public void setUseExponentialBackOff(boolean useExponentialBackOff) {
+        this.useExponentialBackOff = useExponentialBackOff;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed Aug  8 11:56:59 2007
@@ -81,122 +81,120 @@
     private Exception connectionFailure;
 
     private final TransportListener myTransportListener = createTransportListener();
-    
+
     TransportListener createTransportListener() {
-    	return new TransportListener() {
-	        public void onCommand(Object o) {
-            	Command command = (Command) o;
-	            if (command == null) {
-	                return;
-	            }
-	            if (command.isResponse()) {
-                    Object object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
-                    if( object!=null && object.getClass() == Tracked.class ) {
-                	   ((Tracked)object).onResponses();
+        return new TransportListener() {
+            public void onCommand(Object o) {
+                Command command = (Command)o;
+                if (command == null) {
+                    return;
+                }
+                if (command.isResponse()) {
+                    Object object = requestMap.remove(Integer.valueOf(((Response)command).getCorrelationId()));
+                    if (object != null && object.getClass() == Tracked.class) {
+                        ((Tracked)object).onResponses();
                     }
-	            }
-	            if (!initialized){
-	                if (command.isBrokerInfo()){
-	                    BrokerInfo info = (BrokerInfo)command;
-	                    BrokerInfo[] peers = info.getPeerBrokerInfos();
-	                    if (peers!= null){
-	                        for (int i =0; i < peers.length;i++){
-	                            String brokerString = peers[i].getBrokerURL();
-	                            add(brokerString);
-	                        }
-	                    }
-	                initialized = true;
-	                }
-	                
-	            }
-	            if (transportListener != null) {
-	                transportListener.onCommand(command);
-	            }
-	        }
-	
-	        public void onException(IOException error) {
-	            try {
-	                handleTransportFailure(error);
-	            }
-	            catch (InterruptedException e) {
-	                Thread.currentThread().interrupt();
-	                transportListener.onException(new InterruptedIOException());
-	            }
-	        }
-	        
-	        public void transportInterupted(){
-	            if (transportListener != null){
-	                transportListener.transportInterupted();
-	            }
-	        }
-	
-	        public void transportResumed(){
-	            if(transportListener != null){
-	                transportListener.transportResumed();
-	            }
-	        }
-	    };
+                }
+                if (!initialized) {
+                    if (command.isBrokerInfo()) {
+                        BrokerInfo info = (BrokerInfo)command;
+                        BrokerInfo[] peers = info.getPeerBrokerInfos();
+                        if (peers != null) {
+                            for (int i = 0; i < peers.length; i++) {
+                                String brokerString = peers[i].getBrokerURL();
+                                add(brokerString);
+                            }
+                        }
+                        initialized = true;
+                    }
+
+                }
+                if (transportListener != null) {
+                    transportListener.onCommand(command);
+                }
+            }
+
+            public void onException(IOException error) {
+                try {
+                    handleTransportFailure(error);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    transportListener.onException(new InterruptedIOException());
+                }
+            }
+
+            public void transportInterupted() {
+                if (transportListener != null) {
+                    transportListener.transportInterupted();
+                }
+            }
+
+            public void transportResumed() {
+                if (transportListener != null) {
+                    transportListener.transportResumed();
+                }
+            }
+        };
     }
-    
+
     public FailoverTransport() throws InterruptedIOException {
 
-    	stateTracker.setTrackTransactions(true);
-    	
+        stateTracker.setTrackTransactions(true);
+
         // Setup a task that is used to reconnect the a connection async.
         reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
 
             public boolean iterate() {
 
-                Exception failure=null;
+                Exception failure = null;
                 synchronized (reconnectMutex) {
 
-                    if (disposed || connectionFailure!=null) {
+                    if (disposed || connectionFailure != null) {
                         reconnectMutex.notifyAll();
                     }
 
-                    if (connectedTransport != null || disposed || connectionFailure!=null) {
+                    if (connectedTransport != null || disposed || connectionFailure != null) {
                         return false;
                     } else {
                         ArrayList connectList = getConnectList();
-                        if( connectList.isEmpty() ) {
+                        if (connectList.isEmpty()) {
                             failure = new IOException("No uris available to connect to.");
                         } else {
-                            if (!useExponentialBackOff){
+                            if (!useExponentialBackOff) {
                                 reconnectDelay = initialReconnectDelay;
                             }
                             Iterator iter = connectList.iterator();
                             for (int i = 0; iter.hasNext() && connectedTransport == null && !disposed; i++) {
-                                URI uri = (URI) iter.next();
+                                URI uri = (URI)iter.next();
                                 try {
                                     log.debug("Attempting connect to: " + uri);
                                     Transport t = TransportFactory.compositeConnect(uri);
                                     t.setTransportListener(myTransportListener);
                                     t.start();
-                                    
+
                                     if (started) {
                                         restoreTransport(t);
                                     }
-                                    
+
                                     log.debug("Connection established");
                                     reconnectDelay = initialReconnectDelay;
                                     connectedTransportURI = uri;
                                     connectedTransport = t;
                                     reconnectMutex.notifyAll();
                                     connectFailures = 0;
-                                    if (transportListener != null){
+                                    if (transportListener != null) {
                                         transportListener.transportResumed();
                                     }
                                     log.info("Successfully reconnected to " + uri);
                                     return false;
-                                }
-                                catch (Exception e) {
+                                } catch (Exception e) {
                                     failure = e;
                                     log.debug("Connect fail to: " + uri + ", reason: " + e);
                                 }
                             }
                         }
                     }
-                    
+
                     if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) {
                         log.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
                         connectionFailure = failure;
@@ -205,33 +203,32 @@
                     }
                 }
 
-                if(!disposed){
-                    
-                        log.debug("Waiting "+reconnectDelay+" ms before attempting connection. ");
-                        synchronized(sleepMutex){
-                            try{
-                                sleepMutex.wait(reconnectDelay);
-                            }catch(InterruptedException e){
-                               Thread.currentThread().interrupt();
-                            }
+                if (!disposed) {
+
+                    log.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
+                    synchronized (sleepMutex) {
+                        try {
+                            sleepMutex.wait(reconnectDelay);
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
                         }
-                        
-                    
-                    if(useExponentialBackOff){
+                    }
+
+                    if (useExponentialBackOff) {
                         // Exponential increment of reconnect delay.
-                        reconnectDelay*=backOffMultiplier;
-                        if(reconnectDelay>maxReconnectDelay)
-                            reconnectDelay=maxReconnectDelay;
+                        reconnectDelay *= backOffMultiplier;
+                        if (reconnectDelay > maxReconnectDelay)
+                            reconnectDelay = maxReconnectDelay;
                     }
                 }
                 return !disposed;
             }
 
-        }, "ActiveMQ Failover Worker: "+System.identityHashCode(this));
+        }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
     }
 
     final void handleTransportFailure(IOException e) throws InterruptedException {
-        if (transportListener != null){
+        if (transportListener != null) {
             transportListener.transportInterupted();
         }
         synchronized (reconnectMutex) {
@@ -268,11 +265,11 @@
 
             if (connectedTransport != null) {
                 connectedTransport.stop();
-                connectedTransport=null;
+                connectedTransport = null;
             }
             reconnectMutex.notifyAll();
         }
-        synchronized(sleepMutex){
+        synchronized (sleepMutex) {
             sleepMutex.notifyAll();
         }
         reconnectTask.shutdown();
@@ -329,45 +326,44 @@
     /**
      * @return Returns the randomize.
      */
-    public boolean isRandomize(){
+    public boolean isRandomize() {
         return randomize;
     }
 
     /**
      * @param randomize The randomize to set.
      */
-    public void setRandomize(boolean randomize){
-        this.randomize=randomize;
+    public void setRandomize(boolean randomize) {
+        this.randomize = randomize;
     }
 
     public void oneway(Object o) throws IOException {
-    	Command command = (Command) o;
+        Command command = (Command)o;
         Exception error = null;
         try {
 
             synchronized (reconnectMutex) {
                 // Keep trying until the message is sent.
-                for (int i = 0;!disposed; i++) {
+                for (int i = 0; !disposed; i++) {
                     try {
 
                         // Wait for transport to be connected.
-                        while (connectedTransport == null && !disposed && connectionFailure==null ) {
+                        while (connectedTransport == null && !disposed && connectionFailure == null) {
                             log.trace("Waiting for transport to reconnect.");
                             try {
                                 reconnectMutex.wait(1000);
-                            }
-                            catch (InterruptedException e) {
+                            } catch (InterruptedException e) {
                                 Thread.currentThread().interrupt();
                                 log.debug("Interupted: " + e, e);
                             }
                         }
 
-                        if( connectedTransport==null ) {
+                        if (connectedTransport == null) {
                             // Previous loop may have exited due to use being
                             // disposed.
                             if (disposed) {
                                 error = new IOException("Transport disposed.");
-                            } else if (connectionFailure!=null) {
+                            } else if (connectionFailure != null) {
                                 error = connectionFailure;
                             } else {
                                 error = new IOException("Unexpected failure.");
@@ -380,51 +376,53 @@
                         // then hold it in the requestMap so that we can replay
                         // it later.
                         Tracked tracked = stateTracker.track(command);
-                        if( tracked!=null && tracked.isWaitingForResponse() ) {
+                        if (tracked != null && tracked.isWaitingForResponse()) {
                             requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
-                        } else if ( tracked==null && command.isResponseRequired()) {
+                        } else if (tracked == null && command.isResponseRequired()) {
                             requestMap.put(Integer.valueOf(command.getCommandId()), command);
                         }
-                                                
+
                         // Send the message.
                         try {
                             connectedTransport.oneway(command);
                         } catch (IOException e) {
-                        	
-                        	// If the command was not tracked.. we will retry in this method
-                        	if( tracked==null ) {
-                        		
-                        		// since we will retry in this method.. take it out of the request
-                        		// map so that it is not sent 2 times on recovery
-                            	if( command.isResponseRequired() ) {
-                            		requestMap.remove(Integer.valueOf(command.getCommandId()));
-                            	}
-                            	
-                                // Rethrow the exception so it will handled by the outer catch
+
+                            // If the command was not tracked.. we will retry in
+                            // this method
+                            if (tracked == null) {
+
+                                // since we will retry in this method.. take it
+                                // out of the request
+                                // map so that it is not sent 2 times on
+                                // recovery
+                                if (command.isResponseRequired()) {
+                                    requestMap.remove(Integer.valueOf(command.getCommandId()));
+                                }
+
+                                // Rethrow the exception so it will handled by
+                                // the outer catch
                                 throw e;
-                        	}
-                        	
+                            }
+
                         }
-                        
+
                         return;
 
-                    }
-                    catch (IOException e) {
+                    } catch (IOException e) {
                         log.debug("Send oneway attempt: " + i + " failed.");
                         handleTransportFailure(e);
                     }
                 }
             }
-        }
-        catch (InterruptedException e) {
+        } catch (InterruptedException e) {
             // Some one may be trying to stop our thread.
             Thread.currentThread().interrupt();
             throw new InterruptedIOException();
         }
-        if(!disposed){
-            if(error!=null){
-                if(error instanceof IOException)
-                    throw (IOException) error;
+        if (!disposed) {
+            if (error != null) {
+                if (error instanceof IOException)
+                    throw (IOException)error;
                 throw IOExceptionSupport.create(error);
             }
         }
@@ -437,14 +435,14 @@
     public Object request(Object command) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
-    
-    public Object request(Object command,int timeout) throws IOException {
+
+    public Object request(Object command, int timeout) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
 
     public void add(URI u[]) {
         for (int i = 0; i < u.length; i++) {
-            if( !uris.contains(u[i]) )
+            if (!uris.contains(u[i]))
                 uris.add(u[i]);
         }
         reconnect();
@@ -456,20 +454,19 @@
         }
         reconnect();
     }
-    
-    public void add(String u){
+
+    public void add(String u) {
         try {
-        URI uri = new URI(u);
-        if (!uris.contains(uri))
-            uris.add(uri);
+            URI uri = new URI(u);
+            if (!uris.contains(uri))
+                uris.add(uri);
 
-        reconnect();
-        }catch(Exception e){
+            reconnect();
+        } catch (Exception e) {
             log.error("Failed to parse URI: " + u);
         }
     }
 
-
     public void reconnect() {
         log.debug("Waking up reconnect task");
         try {
@@ -479,17 +476,17 @@
         }
     }
 
-    private ArrayList getConnectList(){
-        ArrayList l=new ArrayList(uris);
-        if (randomize){
+    private ArrayList getConnectList() {
+        ArrayList l = new ArrayList(uris);
+        if (randomize) {
             // Randomly, reorder the list by random swapping
-            Random r=new Random();
+            Random r = new Random();
             r.setSeed(System.currentTimeMillis());
-            for (int i=0;i<l.size();i++){
-                int p=r.nextInt(l.size());
-                Object t=l.get(p);
-                l.set(p,l.get(i));
-                l.set(i,t);
+            for (int i = 0; i < l.size(); i++) {
+                int p = r.nextInt(l.size());
+                Object t = l.get(p);
+                l.set(p, l.get(i));
+                l.set(i, t);
             }
         }
         return l;
@@ -521,7 +518,7 @@
         t.start();
         stateTracker.restore(t);
         for (Iterator iter2 = requestMap.values().iterator(); iter2.hasNext();) {
-            Command command = (Command) iter2.next();
+            Command command = (Command)iter2.next();
             t.oneway(command);
         }
     }
@@ -535,17 +532,17 @@
     }
 
     public String toString() {
-        return connectedTransportURI==null ? "unconnected" : connectedTransportURI.toString();
+        return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
+    }
+
+    public String getRemoteAddress() {
+        if (connectedTransport != null) {
+            return connectedTransport.getRemoteAddress();
+        }
+        return null;
     }
 
-	public String getRemoteAddress() {
-		if(connectedTransport != null){
-			return connectedTransport.getRemoteAddress();
-		}
-		return null;
-	}
-    
-    public boolean isFaultTolerant(){
+    public boolean isFaultTolerant() {
         return true;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java Wed Aug  8 11:56:59 2007
@@ -35,25 +35,25 @@
     public Transport doConnect(URI location) throws IOException {
         try {
             Transport transport = createTransport(URISupport.parseComposite(location));
-            transport =  new MutexTransport(transport);
+            transport = new MutexTransport(transport);
             transport = new ResponseCorrelator(transport);
             return transport;
         } catch (URISyntaxException e) {
-            throw new IOException("Invalid location: "+location);
+            throw new IOException("Invalid location: " + location);
         }
     }
-    
+
     public Transport doCompositeConnect(URI location) throws IOException {
         try {
             return createTransport(URISupport.parseComposite(location));
         } catch (URISyntaxException e) {
-            throw new IOException("Invalid location: "+location);
+            throw new IOException("Invalid location: " + location);
         }
     }
 
     /**
      * @param location
-     * @return 
+     * @return
      * @throws IOException
      */
     public Transport createTransport(CompositeData compositData) throws IOException {
@@ -72,8 +72,8 @@
         return transport;
     }
 
-    public TransportServer doBind(String brokerId,URI location) throws IOException {
-        throw new IOException("Invalid server URI: "+location);
+    public TransportServer doBind(String brokerId, URI location) throws IOException {
+        throw new IOException("Invalid server URI: " + location);
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Wed Aug  8 11:56:59 2007
@@ -64,37 +64,37 @@
 
     private final TaskRunner reconnectTask;
     private boolean started;
-    
+
     private ArrayList transports = new ArrayList();
-    private int connectedCount=0;
-    
+    private int connectedCount = 0;
+
     private int minAckCount = 2;
-    
+
     private long initialReconnectDelay = 10;
     private long maxReconnectDelay = 1000 * 30;
-    private long backOffMultiplier = 2;    
+    private long backOffMultiplier = 2;
     private boolean useExponentialBackOff = true;
     private int maxReconnectAttempts;
     private Exception connectionFailure;
     private FanoutTransportHandler primary;
-    
+
     static class RequestCounter {
-        
+
         final Command command;
         final AtomicInteger ackCount;
-        
+
         RequestCounter(Command command, int count) {
             this.command = command;
             this.ackCount = new AtomicInteger(count);
         }
-        
+
         public String toString() {
-            return command.getCommandId()+"="+ackCount.get();
+            return command.getCommandId() + "=" + ackCount.get();
         }
     }
 
     class FanoutTransportHandler extends DefaultTransportListener {
-        
+
         private final URI uri;
         private Transport transport;
 
@@ -103,16 +103,16 @@
         private long reconnectDate;
 
         public FanoutTransportHandler(URI uri) {
-            this.uri=uri;
+            this.uri = uri;
         }
 
         public void onCommand(Object o) {
-        	Command command = (Command) o;
+            Command command = (Command)o;
             if (command.isResponse()) {
-                Integer id = new Integer(((Response) command).getCorrelationId());
-                RequestCounter rc = (RequestCounter) requestMap.get(id);
-                if( rc != null ) {
-                    if( rc.ackCount.decrementAndGet() <= 0 ) {
+                Integer id = new Integer(((Response)command).getCorrelationId());
+                RequestCounter rc = (RequestCounter)requestMap.get(id);
+                if (rc != null) {
+                    if (rc.ackCount.decrementAndGet() <= 0) {
                         requestMap.remove(id);
                         transportListenerOnCommand(command);
                     }
@@ -127,27 +127,26 @@
         public void onException(IOException error) {
             try {
                 synchronized (reconnectMutex) {
-                    if( transport == null )
+                    if (transport == null)
                         return;
-                    
+
                     log.debug("Transport failed, starting up reconnect task", error);
-                    
+
                     ServiceSupport.dispose(transport);
-                    transport=null;
+                    transport = null;
                     connectedCount--;
-                    if( primary == this) {
+                    if (primary == this) {
                         primary = null;
                     }
                     reconnectTask.wakeup();
                 }
-            }
-            catch (InterruptedException e) {
+            } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 if (transportListener != null) {
                     transportListener.onException(new InterruptedIOException());
                 }
             }
-        }        
+        }
     }
 
     public FanoutTransport() throws InterruptedIOException {
@@ -156,58 +155,58 @@
             public boolean iterate() {
                 return doConnect();
             }
-        }, "ActiveMQ Fanout Worker: "+System.identityHashCode(this));
+        }, "ActiveMQ Fanout Worker: " + System.identityHashCode(this));
     }
-    
+
     /**
      * @return
      */
     private boolean doConnect() {
-        long closestReconnectDate=0;
+        long closestReconnectDate = 0;
         synchronized (reconnectMutex) {
 
-            if (disposed || connectionFailure!=null) {
+            if (disposed || connectionFailure != null) {
                 reconnectMutex.notifyAll();
             }
 
-            if (transports.size() == connectedCount || disposed || connectionFailure!=null) {
+            if (transports.size() == connectedCount || disposed || connectionFailure != null) {
                 return false;
             } else {
-                
-                if( transports.isEmpty() ) {
-//                    connectionFailure = new IOException("No uris available to connect to.");
+
+                if (transports.isEmpty()) {
+                    // connectionFailure = new IOException("No uris available to
+                    // connect to.");
                 } else {
-                    
-                    
+
                     // Try to connect them up.
                     Iterator iter = transports.iterator();
                     for (int i = 0; iter.hasNext() && !disposed; i++) {
-                        
+
                         long now = System.currentTimeMillis();
-                        
-                        FanoutTransportHandler fanoutHandler = (FanoutTransportHandler) iter.next();
-                        if( fanoutHandler.transport!=null ) {
+
+                        FanoutTransportHandler fanoutHandler = (FanoutTransportHandler)iter.next();
+                        if (fanoutHandler.transport != null) {
                             continue;
                         }
-                        
+
                         // Are we waiting a little to try to reconnect this one?
-                        if( fanoutHandler.reconnectDate!=0 && fanoutHandler.reconnectDate>now ) {
-                            if( closestReconnectDate==0 || fanoutHandler.reconnectDate < closestReconnectDate ) {
+                        if (fanoutHandler.reconnectDate != 0 && fanoutHandler.reconnectDate > now) {
+                            if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
                                 closestReconnectDate = fanoutHandler.reconnectDate;
                             }
                             continue;
                         }
-                        
+
                         URI uri = fanoutHandler.uri;
                         try {
-                            log.debug("Stopped: "+this);
+                            log.debug("Stopped: " + this);
                             log.debug("Attempting connect to: " + uri);
                             Transport t = TransportFactory.compositeConnect(uri);
                             log.debug("Connection established");
                             fanoutHandler.transport = t;
                             fanoutHandler.reconnectDelay = 10;
                             fanoutHandler.connectFailures = 0;
-                            if( primary == null ) {
+                            if (primary == null) {
                                 primary = fanoutHandler;
                             }
                             t.setTransportListener(fanoutHandler);
@@ -215,50 +214,48 @@
                             if (started) {
                                 restoreTransport(fanoutHandler);
                             }
-                        }
-                        catch (Exception e) {
+                        } catch (Exception e) {
                             log.debug("Connect fail to: " + uri + ", reason: " + e);
-                            
+
                             if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) {
                                 log.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)");
                                 connectionFailure = e;
                                 reconnectMutex.notifyAll();
                                 return false;
                             } else {
-                                
+
                                 if (useExponentialBackOff) {
                                     // Exponential increment of reconnect delay.
                                     fanoutHandler.reconnectDelay *= backOffMultiplier;
                                     if (fanoutHandler.reconnectDelay > maxReconnectDelay)
                                         fanoutHandler.reconnectDelay = maxReconnectDelay;
                                 }
-                                
+
                                 fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay;
-                                
-                                if( closestReconnectDate==0 || fanoutHandler.reconnectDate < closestReconnectDate ) {
+
+                                if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
                                     closestReconnectDate = fanoutHandler.reconnectDate;
                                 }
                             }
                         }
                     }
-                    if (transports.size() == connectedCount || disposed ) {
+                    if (transports.size() == connectedCount || disposed) {
                         reconnectMutex.notifyAll();
                         return false;
                     }
-                    
+
                 }
             }
-            
+
         }
 
         try {
             long reconnectDelay = closestReconnectDate - System.currentTimeMillis();
-            if(reconnectDelay>0) {
+            if (reconnectDelay > 0) {
                 log.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
                 Thread.sleep(reconnectDelay);
             }
-        }
-        catch (InterruptedException e1) {
+        } catch (InterruptedException e1) {
             Thread.currentThread().interrupt();
         }
         return true;
@@ -271,8 +268,8 @@
                 return;
             started = true;
             for (Iterator iter = transports.iterator(); iter.hasNext();) {
-                FanoutTransportHandler th = (FanoutTransportHandler) iter.next();
-                if( th.transport != null ) {
+                FanoutTransportHandler th = (FanoutTransportHandler)iter.next();
+                if (th.transport != null) {
                     restoreTransport(th);
                 }
             }
@@ -281,21 +278,21 @@
 
     public void stop() throws Exception {
         synchronized (reconnectMutex) {
-        	ServiceStopper ss = new ServiceStopper();
-        	
+            ServiceStopper ss = new ServiceStopper();
+
             if (!started)
                 return;
             started = false;
             disposed = true;
 
             for (Iterator iter = transports.iterator(); iter.hasNext();) {
-                FanoutTransportHandler th = (FanoutTransportHandler) iter.next();
-                if( th.transport != null ) {
-                	ss.stop(th.transport);
+                FanoutTransportHandler th = (FanoutTransportHandler)iter.next();
+                if (th.transport != null) {
+                    ss.stop(th.transport);
                 }
             }
-            
-            log.debug("Stopped: "+this);
+
+            log.debug("Stopped: " + this);
             ss.throwFirstException();
         }
         reconnectTask.shutdown();
@@ -334,50 +331,50 @@
     }
 
     public void oneway(Object o) throws IOException {
-    	final Command command = (Command) o;
+        final Command command = (Command)o;
         try {
             synchronized (reconnectMutex) {
-                
+
                 // If it was a request and it was not being tracked by
                 // the state tracker,
                 // then hold it in the requestMap so that we can replay
                 // it later.
                 boolean fanout = isFanoutCommand(command);
-                if (stateTracker.track(command)==null && command.isResponseRequired() ) {
+                if (stateTracker.track(command) == null && command.isResponseRequired()) {
                     int size = fanout ? minAckCount : 1;
                     requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
                 }
 
                 // Wait for transport to be connected.
-                while (connectedCount != minAckCount && !disposed && connectionFailure==null ) {
-                    log.debug("Waiting for at least "+minAckCount+" transports to be connected.");
+                while (connectedCount != minAckCount && !disposed && connectionFailure == null) {
+                    log.debug("Waiting for at least " + minAckCount + " transports to be connected.");
                     reconnectMutex.wait(1000);
                 }
 
                 // Still not fully connected.
-                if( connectedCount != minAckCount ) {
+                if (connectedCount != minAckCount) {
 
                     Exception error;
-                    
+
                     // Throw the right kind of error..
                     if (disposed) {
                         error = new IOException("Transport disposed.");
-                    } else if (connectionFailure!=null) {
+                    } else if (connectionFailure != null) {
                         error = connectionFailure;
                     } else {
                         error = new IOException("Unexpected failure.");
                     }
-                    
-                    if( error instanceof IOException )
+
+                    if (error instanceof IOException)
                         throw (IOException)error;
                     throw IOExceptionSupport.create(error);
                 }
-                
+
                 // Send the message.
-                if( fanout ) {
+                if (fanout) {
                     for (Iterator iter = transports.iterator(); iter.hasNext();) {
-                        FanoutTransportHandler th = (FanoutTransportHandler) iter.next();
-                        if( th.transport!=null ) {
+                        FanoutTransportHandler th = (FanoutTransportHandler)iter.next();
+                        if (th.transport != null) {
                             try {
                                 th.transport.oneway(command);
                             } catch (IOException e) {
@@ -394,7 +391,7 @@
                         primary.onException(e);
                     }
                 }
-                
+
             }
         } catch (InterruptedException e) {
             // Some one may be trying to stop our thread.
@@ -408,10 +405,10 @@
      * @return
      */
     private boolean isFanoutCommand(Command command) {
-        if( command.isMessage() ) {
+        if (command.isMessage()) {
             return ((Message)command).getDestination().isTopic();
-        } 
-        if( command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE ) {
+        }
+        if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) {
             return false;
         }
         return true;
@@ -424,8 +421,8 @@
     public Object request(Object command) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
-    
-    public Object request(Object command,int timeout) throws IOException {
+
+    public Object request(Object command, int timeout) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
 
@@ -451,67 +448,67 @@
         if (target.isAssignableFrom(getClass())) {
             return this;
         }
-        
+
         synchronized (reconnectMutex) {
             for (Iterator iter = transports.iterator(); iter.hasNext();) {
-                FanoutTransportHandler th = (FanoutTransportHandler) iter.next();
-                if( th.transport!=null ) {
+                FanoutTransportHandler th = (FanoutTransportHandler)iter.next();
+                if (th.transport != null) {
                     Object rc = th.transport.narrow(target);
-                    if( rc !=null )
+                    if (rc != null)
                         return rc;
                 }
             }
         }
-        
+
         return null;
 
     }
 
     protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException {
         th.transport.start();
-        stateTracker.setRestoreConsumers(th.transport==primary);
+        stateTracker.setRestoreConsumers(th.transport == primary);
         stateTracker.restore(th.transport);
         for (Iterator iter2 = requestMap.values().iterator(); iter2.hasNext();) {
-            RequestCounter rc = (RequestCounter) iter2.next();
+            RequestCounter rc = (RequestCounter)iter2.next();
             th.transport.oneway(rc.command);
         }
     }
 
     public void add(URI uris[]) {
-        
+
         synchronized (reconnectMutex) {
             for (int i = 0; i < uris.length; i++) {
                 URI uri = uris[i];
-                
-                boolean match=false;
+
+                boolean match = false;
                 for (Iterator iter = transports.iterator(); iter.hasNext();) {
-                    FanoutTransportHandler th = (FanoutTransportHandler) iter.next();
-                    if( th.uri.equals(uri)) {
-                        match=true;
+                    FanoutTransportHandler th = (FanoutTransportHandler)iter.next();
+                    if (th.uri.equals(uri)) {
+                        match = true;
                         break;
                     }
                 }
-                if( !match ) {
+                if (!match) {
                     FanoutTransportHandler th = new FanoutTransportHandler(uri);
                     transports.add(th);
                     reconnect();
                 }
             }
         }
-        
+
     }
-    
+
     public void remove(URI uris[]) {
-        
+
         synchronized (reconnectMutex) {
             for (int i = 0; i < uris.length; i++) {
                 URI uri = uris[i];
-                
-                boolean match=false;
+
+                boolean match = false;
                 for (Iterator iter = transports.iterator(); iter.hasNext();) {
-                    FanoutTransportHandler th = (FanoutTransportHandler) iter.next();
-                    if( th.uri.equals(uri)) {
-                        if( th.transport!=null ) {
+                    FanoutTransportHandler th = (FanoutTransportHandler)iter.next();
+                    if (th.uri.equals(uri)) {
+                        if (th.transport != null) {
                             ServiceSupport.dispose(th.transport);
                             connectedCount--;
                         }
@@ -521,17 +518,17 @@
                 }
             }
         }
-        
+
     }
 
-	public String getRemoteAddress() {
-		if(primary != null){
-		   if(primary.transport != null){
-			   return primary.transport.getRemoteAddress(); 
-		   }
-		}
-		return null;
-	}
+    public String getRemoteAddress() {
+        if (primary != null) {
+            if (primary.transport != null) {
+                return primary.transport.getRemoteAddress();
+            }
+        }
+        return null;
+    }
 
     protected void transportListenerOnCommand(Command command) {
         if (transportListener != null) {
@@ -539,8 +536,7 @@
         }
     }
 
-    
-    public boolean isFaultTolerant(){
+    public boolean isFaultTolerant() {
         return true;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransportFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransportFactory.java Wed Aug  8 11:56:59 2007
@@ -39,37 +39,37 @@
     public Transport doConnect(URI location) throws IOException {
         try {
             Transport transport = createTransport(location);
-            transport =  new MutexTransport(transport);
+            transport = new MutexTransport(transport);
             transport = new ResponseCorrelator(transport);
             return transport;
         } catch (URISyntaxException e) {
-            throw new IOException("Invalid location: "+location);
+            throw new IOException("Invalid location: " + location);
         }
     }
-    
+
     public Transport doCompositeConnect(URI location) throws IOException {
         try {
             return createTransport(location);
         } catch (URISyntaxException e) {
-            throw new IOException("Invalid location: "+location);
+            throw new IOException("Invalid location: " + location);
         }
     }
 
     /**
      * @param location
-     * @return 
+     * @return
      * @throws IOException
-     * @throws URISyntaxException 
+     * @throws URISyntaxException
      */
     public Transport createTransport(URI location) throws IOException, URISyntaxException {
-        
+
         CompositeData compositData = URISupport.parseComposite(location);
         Map parameters = new HashMap(compositData.getParameters());
         DiscoveryTransport transport = new DiscoveryTransport(createTransport(parameters));
-        
+
         DiscoveryAgent discoveryAgent = DiscoveryAgentFactory.createDiscoveryAgent(compositData.getComponents()[0]);
         transport.setDiscoveryAgent(discoveryAgent);
-        
+
         return transport;
 
     }
@@ -80,8 +80,8 @@
         return transport;
     }
 
-    public TransportServer doBind(String brokerId,URI location) throws IOException {
-        throw new IOException("Invalid server URI: "+location);
+    public TransportServer doBind(String brokerId, URI location) throws IOException {
+        throw new IOException("Invalid server URI: " + location);
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java Wed Aug  8 11:56:59 2007
@@ -25,7 +25,6 @@
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.TransportListener;
 
-
 /**
  * @version $Revision: 1.5 $
  */
@@ -48,15 +47,14 @@
             getNext().setTransportListener(this);
     }
 
-
     /**
      * @see org.apache.activemq.Service#start()
      * @throws IOException if the next channel has not been set.
      */
     public void start() throws Exception {
-        if( getNext() == null )
+        if (getNext() == null)
             throw new IOException("The next channel has not been set.");
-        if( transportListener == null )
+        if (transportListener == null)
             throw new IOException("The command listener has not been set.");
         getNext().start();
     }
@@ -66,7 +64,7 @@
      */
     public void stop() throws Exception {
         getNext().stop();
-    }    
+    }
 
     public void onCommand(Object command) {
         getTransportListener().onCommand(command);
@@ -85,7 +83,7 @@
     synchronized public TransportListener getTransportListener() {
         return transportListener;
     }
-    
+
     public String toString() {
         return getNext().toString();
     }
@@ -101,8 +99,8 @@
     public Object request(Object command) throws IOException {
         return getNext().request(command);
     }
-    
-    public Object request(Object command,int timeout) throws IOException {
+
+    public Object request(Object command, int timeout) throws IOException {
         return getNext().request(command, timeout);
     }
 
@@ -111,7 +109,7 @@
     }
 
     public Object narrow(Class target) {
-        if( target.isAssignableFrom(getClass()) ) {
+        if (target.isAssignableFrom(getClass())) {
             return this;
         }
         return getNext().narrow(target);
@@ -127,15 +125,15 @@
         setNext(filter);
     }
 
-	public String getRemoteAddress() {
-		return getNext().getRemoteAddress();
-	}
+    public String getRemoteAddress() {
+        return getNext().getRemoteAddress();
+    }
 
     /**
      * @see org.apache.activemq.transport.Transport#isFaultTolerant()
      */
-    public boolean isFaultTolerant(){
-       return getNext().isFaultTolerant();
-    }  
-    
+    public boolean isFaultTolerant() {
+        return getNext().isFaultTolerant();
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransportFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransportFactory.java Wed Aug  8 11:56:59 2007
@@ -33,27 +33,27 @@
 
     public Transport doConnect(URI location) throws URISyntaxException, Exception {
         Transport transport = createTransport(URISupport.parseComposite(location));
-        transport =  new MutexTransport(transport);
+        transport = new MutexTransport(transport);
         transport = new ResponseCorrelator(transport);
         return transport;
     }
-    
+
     public Transport doCompositeConnect(URI location) throws URISyntaxException, Exception {
         return createTransport(URISupport.parseComposite(location));
     }
-    
+
     /**
      * @param location
-     * @return 
-     * @throws Exception 
+     * @return
+     * @throws Exception
      */
     public Transport createTransport(CompositeData compositData) throws Exception {
-        MockTransport transport = new MockTransport( TransportFactory.compositeConnect(compositData.getComponents()[0]) );
+        MockTransport transport = new MockTransport(TransportFactory.compositeConnect(compositData.getComponents()[0]));
         IntrospectionSupport.setProperties(transport, compositData.getParameters());
         return transport;
     }
 
-    public TransportServer doBind(String brokerId,URI location) throws IOException {
+    public TransportServer doBind(String brokerId, URI location) throws IOException {
         throw new IOException("This protocol does not support being bound.");
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java Wed Aug  8 11:56:59 2007
@@ -20,54 +20,55 @@
 import java.io.InputStream;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
+
 /**
  * An optimized buffered input stream for Tcp
  * 
  * @version $Revision: 1.1.1.1 $
  */
 public class NIOInputStream extends InputStream {
-	
+
     protected int count;
     protected int position;
-	private final ByteBuffer in;
+    private final ByteBuffer in;
 
-    public NIOInputStream(ByteBuffer in){
-		this.in = in;
+    public NIOInputStream(ByteBuffer in) {
+        this.in = in;
     }
 
     public int read() throws IOException {
-    	try {
-    		int rc = in.get()& 0xff; 
-    		return rc;
-    	} catch ( BufferUnderflowException e ) {
-    		return -1;
-    	}
-    }
-
-    public int read(byte b[],int off,int len) throws IOException{
-    	if( in.hasRemaining() ) {
-	    	int rc = Math.min(len, in.remaining());
-	    	in.get(b, off, rc);
-	    	return rc;
-    	} else {
-    		return len == 0 ? 0 : -1; 
-    	}
-    }
-
-    public long skip(long n) throws IOException{
-    	int rc = Math.min((int)n, in.remaining());
-    	in.position(in.position()+rc);
+        try {
+            int rc = in.get() & 0xff;
+            return rc;
+        } catch (BufferUnderflowException e) {
+            return -1;
+        }
+    }
+
+    public int read(byte b[], int off, int len) throws IOException {
+        if (in.hasRemaining()) {
+            int rc = Math.min(len, in.remaining());
+            in.get(b, off, rc);
+            return rc;
+        } else {
+            return len == 0 ? 0 : -1;
+        }
+    }
+
+    public long skip(long n) throws IOException {
+        int rc = Math.min((int)n, in.remaining());
+        in.position(in.position() + rc);
         return rc;
     }
 
-    public int available() throws IOException{
+    public int available() throws IOException {
         return in.remaining();
     }
 
-    public boolean markSupported(){
+    public boolean markSupported() {
         return false;
     }
 
-    public void close() throws IOException{
+    public void close() throws IOException {
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java Wed Aug  8 11:56:59 2007
@@ -25,24 +25,24 @@
 
 /**
  * An optimized buffered outputstream for Tcp
- *
+ * 
  * @version $Revision: 1.1.1.1 $
  */
 
 public class NIOOutputStream extends OutputStream {
-	
-	private final static int BUFFER_SIZE = 8192;
 
-	private final WritableByteChannel out;
+    private final static int BUFFER_SIZE = 8192;
+
+    private final WritableByteChannel out;
     private final byte[] buffer;
-	private final ByteBuffer byteBuffer;
-    
+    private final ByteBuffer byteBuffer;
+
     private int count;
     private boolean closed;
 
     /**
      * Constructor
-     *
+     * 
      * @param out
      */
     public NIOOutputStream(WritableByteChannel out) {
@@ -50,16 +50,16 @@
     }
 
     /**
-     * Creates a new buffered output stream to write data to the specified underlying output stream with the specified
-     * buffer size.
-     *
-     * @param out  the underlying output stream.
+     * Creates a new buffered output stream to write data to the specified
+     * underlying output stream with the specified buffer size.
+     * 
+     * @param out the underlying output stream.
      * @param size the buffer size.
      * @throws IllegalArgumentException if size <= 0.
      */
     public NIOOutputStream(WritableByteChannel out, int size) {
         this.out = out;
-		if (size <= 0) {
+        if (size <= 0) {
             throw new IllegalArgumentException("Buffer size <= 0");
         }
         buffer = new byte[size];
@@ -68,7 +68,7 @@
 
     /**
      * write a byte on to the stream
-     *
+     * 
      * @param b - byte to write
      * @throws IOException
      */
@@ -77,14 +77,13 @@
         if (availableBufferToWrite() < 1) {
             flush();
         }
-        buffer[count++] = (byte) b;
+        buffer[count++] = (byte)b;
     }
 
-
     /**
      * write a byte array to the stream
-     *
-     * @param b   the byte buffer
+     * 
+     * @param b the byte buffer
      * @param off the offset into the buffer
      * @param len the length of data to write
      * @throws IOException
@@ -97,23 +96,22 @@
         if (buffer.length >= len) {
             System.arraycopy(b, off, buffer, count, len);
             count += len;
-        }
-        else {
-        	write( ByteBuffer.wrap(b, off, len));
+        } else {
+            write(ByteBuffer.wrap(b, off, len));
         }
     }
 
-	/**
-     * flush the data to the output stream
-     * This doesn't call flush on the underlying outputstream, because
-     * Tcp is particularly efficent at doing this itself ....
-     *
+    /**
+     * flush the data to the output stream This doesn't call flush on the
+     * underlying outputstream, because Tcp is particularly efficent at doing
+     * this itself ....
+     * 
      * @throws IOException
      */
     public void flush() throws IOException {
         if (count > 0 && out != null) {
-        	byteBuffer.position(0);
-        	byteBuffer.limit(count);
+            byteBuffer.position(0);
+            byteBuffer.limit(count);
             write(byteBuffer);
             count = 0;
         }
@@ -121,7 +119,7 @@
 
     /**
      * close this stream
-     *
+     * 
      * @throws IOException
      */
     public void close() throws IOException {
@@ -129,10 +127,9 @@
         closed = true;
     }
 
-
     /**
      * Checks that the stream has not been closed
-     *
+     * 
      * @throws IOException
      */
     protected void checkClosed() throws IOException {
@@ -147,35 +144,36 @@
     private int availableBufferToWrite() {
         return buffer.length - count;
     }
-    
+
     protected void write(ByteBuffer data) throws IOException {
-        int remaining = data.remaining();        
-        int lastRemaining = remaining-1;
-        long delay=1;
-        while( remaining > 0 ) {
-        	
-	        // We may need to do a little bit of sleeping to avoid a busy loop.
-            // Slow down if no data was written out.. 
-	        if( remaining == lastRemaining ) {
-	            try {
+        int remaining = data.remaining();
+        int lastRemaining = remaining - 1;
+        long delay = 1;
+        while (remaining > 0) {
+
+            // We may need to do a little bit of sleeping to avoid a busy loop.
+            // Slow down if no data was written out..
+            if (remaining == lastRemaining) {
+                try {
                     // Use exponential rollback to increase sleep time.
                     Thread.sleep(delay);
                     delay *= 2;
-                    if( delay > 1000 ) {
+                    if (delay > 1000) {
                         delay = 1000;
                     }
                 } catch (InterruptedException e) {
                     throw new InterruptedIOException();
-                }                        
-	        } else {
-	            delay = 1;
-	        }        	        
-	        lastRemaining = remaining;
-	        
-            // Since the write is non-blocking, all the data may not have been written.
-            out.write( data );        
-            remaining = data.remaining();        	        
-        }    
-	}
-    
+                }
+            } else {
+                delay = 1;
+            }
+            lastRemaining = remaining;
+
+            // Since the write is non-blocking, all the data may not have been
+            // written.
+            out.write(data);
+            remaining = data.remaining();
+        }
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java Wed Aug  8 11:56:59 2007
@@ -43,113 +43,112 @@
  */
 public class NIOTransport extends TcpTransport {
 
-	//private static final Log log = LogFactory.getLog(NIOTransport.class);
-	private SocketChannel channel;
-	private SelectorSelection selection;
-	private ByteBuffer inputBuffer;
-	private ByteBuffer currentBuffer;
-	private int nextFrameSize;
-
-	public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
-		super(wireFormat, socketFactory, remoteLocation, localLocation);
-	}
-
-	public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
-		super(wireFormat, socket);
-	}
-
-	protected void initializeStreams() throws IOException {
-		channel = socket.getChannel();		
-		channel.configureBlocking(false);
-		
-		// listen for events telling us when the socket is readable.
-		selection = SelectorManager.getInstance().register(channel,
-				new SelectorManager.Listener() {
-					public void onSelect(SelectorSelection selection) {
-						serviceRead();
-					}
-					public void onError(SelectorSelection selection, Throwable error) {
-						if( error instanceof IOException ) {
-							onException((IOException) error);							
-						} else {
-							onException(IOExceptionSupport.create(error));							
-						}
-					}
-				});
-		
-		// Send the data via the channel
-//        inputBuffer = ByteBuffer.allocateDirect(8*1024);
-        inputBuffer = ByteBuffer.allocate(8*1024);
+    // private static final Log log = LogFactory.getLog(NIOTransport.class);
+    private SocketChannel channel;
+    private SelectorSelection selection;
+    private ByteBuffer inputBuffer;
+    private ByteBuffer currentBuffer;
+    private int nextFrameSize;
+
+    public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+        super(wireFormat, socketFactory, remoteLocation, localLocation);
+    }
+
+    public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
+        super(wireFormat, socket);
+    }
+
+    protected void initializeStreams() throws IOException {
+        channel = socket.getChannel();
+        channel.configureBlocking(false);
+
+        // listen for events telling us when the socket is readable.
+        selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
+            public void onSelect(SelectorSelection selection) {
+                serviceRead();
+            }
+
+            public void onError(SelectorSelection selection, Throwable error) {
+                if (error instanceof IOException) {
+                    onException((IOException)error);
+                } else {
+                    onException(IOExceptionSupport.create(error));
+                }
+            }
+        });
+
+        // Send the data via the channel
+        // inputBuffer = ByteBuffer.allocateDirect(8*1024);
+        inputBuffer = ByteBuffer.allocate(8 * 1024);
         currentBuffer = inputBuffer;
-        nextFrameSize=-1;
+        nextFrameSize = -1;
         currentBuffer.limit(4);
-        this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16*1024));
-        
-	}
-	
+        this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16 * 1024));
+
+    }
+
     private void serviceRead() {
         try {
-            while( true ) {
-            	
-	
-	            int readSize = channel.read(currentBuffer);
-	            if( readSize == -1 ) {
-					onException(new EOFException());
-	                selection.close();
-	                break;
-	            }
-	            if( readSize==0 ) {
-	                break;
-	            }
-	            
-            	if( currentBuffer.hasRemaining() )
-            		continue;
-
-	            // Are we trying to figure out the size of the next frame?
-	            if( nextFrameSize==-1 ) {
-	            	assert inputBuffer == currentBuffer;
-
-	            	// If the frame is too big to fit in our direct byte buffer,
-	            	// Then allocate a non direct byte buffer of the right size for it.
-	            	inputBuffer.flip();
-	            	nextFrameSize = inputBuffer.getInt()+4;
-	            	if( nextFrameSize > inputBuffer.capacity() ) {
-	            		currentBuffer = ByteBuffer.allocate(nextFrameSize);
-	            		currentBuffer.putInt(nextFrameSize);
-	            	} else {
-	            		inputBuffer.limit(nextFrameSize);	            		
-	            	}
-	            	
-            	} else {
-            		currentBuffer.flip();
-    				
-            		Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
-            		doConsume((Command) command);
-            		
-            		nextFrameSize=-1;
-    				inputBuffer.clear();
-    				inputBuffer.limit(4);
-    				currentBuffer = inputBuffer;
-            	}
-	            
+            while (true) {
+
+                int readSize = channel.read(currentBuffer);
+                if (readSize == -1) {
+                    onException(new EOFException());
+                    selection.close();
+                    break;
+                }
+                if (readSize == 0) {
+                    break;
+                }
+
+                if (currentBuffer.hasRemaining())
+                    continue;
+
+                // Are we trying to figure out the size of the next frame?
+                if (nextFrameSize == -1) {
+                    assert inputBuffer == currentBuffer;
+
+                    // If the frame is too big to fit in our direct byte buffer,
+                    // Then allocate a non direct byte buffer of the right size
+                    // for it.
+                    inputBuffer.flip();
+                    nextFrameSize = inputBuffer.getInt() + 4;
+                    if (nextFrameSize > inputBuffer.capacity()) {
+                        currentBuffer = ByteBuffer.allocate(nextFrameSize);
+                        currentBuffer.putInt(nextFrameSize);
+                    } else {
+                        inputBuffer.limit(nextFrameSize);
+                    }
+
+                } else {
+                    currentBuffer.flip();
+
+                    Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
+                    doConsume((Command)command);
+
+                    nextFrameSize = -1;
+                    inputBuffer.clear();
+                    inputBuffer.limit(4);
+                    currentBuffer = inputBuffer;
+                }
+
             }
-            
+
         } catch (IOException e) {
             onException(e);
         } catch (Throwable e) {
-        	onException(IOExceptionSupport.create(e));
+            onException(IOExceptionSupport.create(e));
         }
     }
 
-
-	protected void doStart() throws Exception {
+    protected void doStart() throws Exception {
         connect();
         selection.setInterestOps(SelectionKey.OP_READ);
         selection.enable();
     }
 
-	protected void doStop(ServiceStopper stopper) throws Exception {
-		selection.disable();
-		super.doStop(stopper);		
-	}
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        selection.disable();
+        super.doStop(stopper);
+    }
 }



Mime
View raw message