activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r563982 [22/32] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jm...
Date Wed, 08 Aug 2007 18:58:13 GMT
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java Wed Aug  8 11:56:59 2007
@@ -16,21 +16,19 @@
  */
 package org.apache.activemq.thread;
 
-
 /**
- *
  * @version $Revision: 1.1 $
  */
 class DedicatedTaskRunner implements TaskRunner {
 
     private final Task task;
     private final Thread thread;
-    
+
     private final Object mutex = new Object();
     private boolean threadTerminated;
     private boolean pending;
     private boolean shutdown;
-    
+
     public DedicatedTaskRunner(Task task, String name, int priority, boolean daemon) {
         this.task = task;
         thread = new Thread(name) {
@@ -43,81 +41,83 @@
         thread.setPriority(priority);
         thread.start();
     }
-    
+
     /**
      */
     public void wakeup() throws InterruptedException {
-        synchronized( mutex ) {
-            if( shutdown )
+        synchronized (mutex) {
+            if (shutdown)
                 return;
-            pending=true;            
+            pending = true;
             mutex.notifyAll();
         }
     }
 
     /**
      * shut down the task
-     * @param timeout 
-     * @throws InterruptedException 
+     * 
+     * @param timeout
+     * @throws InterruptedException
      */
-    public void shutdown(long timeout) throws InterruptedException{
-        synchronized(mutex){
-            shutdown=true;
-            pending=true;
+    public void shutdown(long timeout) throws InterruptedException {
+        synchronized (mutex) {
+            shutdown = true;
+            pending = true;
             mutex.notifyAll();
 
-            // Wait till the thread stops ( no need to wait if shutdown 
-            // is called from thread that is shutting down) 
-            if( Thread.currentThread()!=thread && !threadTerminated ){
+            // Wait till the thread stops ( no need to wait if shutdown
+            // is called from thread that is shutting down)
+            if (Thread.currentThread() != thread && !threadTerminated) {
                 mutex.wait(timeout);
             }
         }
-    }      
-    
+    }
+
     /**
      * shut down the task
-     * @throws InterruptedException 
+     * 
+     * @throws InterruptedException
      */
-    public void shutdown() throws InterruptedException{
+    public void shutdown() throws InterruptedException {
         shutdown(0);
     }
-    
+
     final void runTask() {
-        
+
         try {
-            while( true ) {
-             
-                synchronized (mutex) {   
-                    pending=false;
-                    if( shutdown ) {
+            while (true) {
+
+                synchronized (mutex) {
+                    pending = false;
+                    if (shutdown) {
                         return;
                     }
                 }
-                
-                if( !task.iterate() ) {
+
+                if (!task.iterate()) {
                     // wait to be notified.
                     synchronized (mutex) {
-                        if( shutdown ) {
+                        if (shutdown) {
                             return;
                         }
-                        while( !pending ) {
+                        while (!pending) {
                             mutex.wait();
                         }
                     }
                 }
-                
+
             }
-            
+
         } catch (InterruptedException e) {
             // Someone really wants this thread to die off.
             Thread.currentThread().interrupt();
         } finally {
-            // Make sure we notify any waiting threads that thread 
+            // Make sure we notify any waiting threads that thread
             // has terminated.
             synchronized (mutex) {
-                threadTerminated=true;
+                threadTerminated = true;
                 mutex.notifyAll();
-            }            
+            }
         }
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java Wed Aug  8 11:56:59 2007
@@ -19,7 +19,6 @@
 import java.util.concurrent.Executor;
 
 /**
- *
  * @version $Revision: 1.1 $
  */
 class PooledTaskRunner implements TaskRunner {
@@ -32,7 +31,7 @@
     private boolean shutdown;
     private boolean iterating;
     private Thread runningThread;
-    
+
     public PooledTaskRunner(Executor executor, Task task, int maxIterationsPerRun) {
         this.executor = executor;
         this.maxIterationsPerRun = maxIterationsPerRun;
@@ -45,28 +44,31 @@
             }
         };
     }
-    
-
 
     /**
      * We Expect MANY wakeup calls on the same TaskRunner.
      */
     public void wakeup() throws InterruptedException {
-        synchronized( runable ) {
-            
+        synchronized (runable) {
+
             // When we get in here, we make some assumptions of state:
-            // queued=false, iterating=false: wakeup() has not be called and therefore task is not executing.
-            // queued=true,  iterating=false: wakeup() was called but, task execution has not started yet 
-            // queued=false, iterating=true : wakeup() was called, which caused task execution to start.
-            // queued=true,  iterating=true : wakeup() called after task execution was started. 
-            
-            if( queued || shutdown )
+            // queued=false, iterating=false: wakeup() has not be called and
+            // therefore task is not executing.
+            // queued=true, iterating=false: wakeup() was called but, task
+            // execution has not started yet
+            // queued=false, iterating=true : wakeup() was called, which caused
+            // task execution to start.
+            // queued=true, iterating=true : wakeup() called after task
+            // execution was started.
+
+            if (queued || shutdown)
                 return;
-            
-            queued=true;
-            
-            // The runTask() method will do this for me once we are done iterating.
-            if( !iterating ) {
+
+            queued = true;
+
+            // The runTask() method will do this for me once we are done
+            // iterating.
+            if (!iterating) {
                 executor.execute(runable);
             }
         }
@@ -74,63 +76,64 @@
 
     /**
      * shut down the task
-     * @throws InterruptedException 
+     * 
+     * @throws InterruptedException
      */
-    public void shutdown(long timeout) throws InterruptedException{
-        synchronized(runable){
-            shutdown=true;
-            //the check on the thread is done
-            //because a call to iterate can result in
-            //shutDown() being called, which would wait forever
-            //waiting for iterating to finish
-            if(runningThread!=Thread.currentThread()){
-                if(iterating==true){
+    public void shutdown(long timeout) throws InterruptedException {
+        synchronized (runable) {
+            shutdown = true;
+            // the check on the thread is done
+            // because a call to iterate can result in
+            // shutDown() being called, which would wait forever
+            // waiting for iterating to finish
+            if (runningThread != Thread.currentThread()) {
+                if (iterating == true) {
                     runable.wait(timeout);
                 }
             }
         }
-    }        
-    
-    
+    }
+
     public void shutdown() throws InterruptedException {
         shutdown(0);
     }
+
     final void runTask() {
-        
+
         synchronized (runable) {
             queued = false;
-            if( shutdown ) {
+            if (shutdown) {
                 iterating = false;
                 runable.notifyAll();
                 return;
             }
             iterating = true;
         }
-        
-        // Don't synchronize while we are iterating so that 
+
+        // Don't synchronize while we are iterating so that
         // multiple wakeup() calls can be executed concurrently.
-        boolean done=false;
+        boolean done = false;
         for (int i = 0; i < maxIterationsPerRun; i++) {
-            if( !task.iterate() ) {
-                done=true;
+            if (!task.iterate()) {
+                done = true;
                 break;
             }
         }
-        
+
         synchronized (runable) {
-            iterating=false;
-            if( shutdown ) {
-                queued=false;
+            iterating = false;
+            if (shutdown) {
+                queued = false;
                 runable.notifyAll();
                 return;
             }
-            
+
             // If we could not iterate all the items
             // then we need to re-queue.
-            if( !done )
-                queued = true;    
-            
-            if( queued ) {
+            if (!done)
+                queued = true;
+
+            if (queued) {
                 executor.execute(runable);
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java Wed Aug  8 11:56:59 2007
@@ -28,14 +28,13 @@
  */
 public class Scheduler {
 
-    
-    public static final ScheduledThreadPoolExecutor clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory(){
-            public Thread newThread(Runnable runnable) {
-                Thread thread = new Thread(runnable,"ActiveMQ Scheduler");
-                thread.setDaemon(true);
-                return thread;
-            }
-        });
+    public static final ScheduledThreadPoolExecutor clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
+        public Thread newThread(Runnable runnable) {
+            Thread thread = new Thread(runnable, "ActiveMQ Scheduler");
+            thread.setDaemon(true);
+            return thread;
+        }
+    });
     static {
         clockDaemon.setKeepAliveTime(5, TimeUnit.SECONDS);
     }
@@ -47,12 +46,12 @@
     }
 
     synchronized static public void cancel(Runnable task) {
-        ScheduledFuture ticket = (ScheduledFuture) clockTickets.remove(task);
-        if( ticket!=null ) {
+        ScheduledFuture ticket = (ScheduledFuture)clockTickets.remove(task);
+        if (ticket != null) {
             ticket.cancel(false);
 
             if (ticket instanceof Runnable)
-            	clockDaemon.remove((Runnable) ticket);            
+                clockDaemon.remove((Runnable)ticket);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java Wed Aug  8 11:56:59 2007
@@ -24,14 +24,11 @@
 import java.util.concurrent.TimeUnit;
 
 /**
- * Manages the thread pool for long running tasks.
- * 
- * Long running tasks are not always active but when they are active, they may
- * need a few iterations of processing for them to become idle. The manager
- * ensures that each task is processes but that no one task overtakes the
- * system.
- * 
- * This is kina like cooperative multitasking.
+ * Manages the thread pool for long running tasks. Long running tasks are not
+ * always active but when they are active, they may need a few iterations of
+ * processing for them to become idle. The manager ensures that each task is
+ * processes but that no one task overtakes the system. This is kina like
+ * cooperative multitasking.
  * 
  * @version $Revision: 1.5 $
  */
@@ -48,15 +45,17 @@
     }
 
     public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) {
-        
+
         this.name = name;
         this.priority = priority;
         this.daemon = daemon;
         this.maxIterationsPerRun = maxIterationsPerRun;
-        
-        // If your OS/JVM combination has a good thread model, you may want to avoid 
-        // using a thread pool to run tasks and use a DedicatedTaskRunner instead.
-        if( "true".equals(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner")) ) {
+
+        // If your OS/JVM combination has a good thread model, you may want to
+        // avoid
+        // using a thread pool to run tasks and use a DedicatedTaskRunner
+        // instead.
+        if ("true".equals(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) {
             executor = null;
         } else {
             executor = createDefaultExecutor();
@@ -68,15 +67,15 @@
             executor.shutdownNow();
         }
     }
-    
+
     public TaskRunner createTaskRunner(Task task, String name) {
-        if( executor!=null ) {
+        if (executor != null) {
             return new PooledTaskRunner(executor, task, maxIterationsPerRun);
         } else {
             return new DedicatedTaskRunner(task, name, priority, daemon);
         }
     }
-    
+
     protected ExecutorService createDefaultExecutor() {
         ThreadPoolExecutor rc = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() {
             public Thread newThread(Runnable runnable) {
@@ -86,7 +85,7 @@
                 return thread;
             }
         });
-        //rc.allowCoreThreadTimeOut(true);
+        // rc.allowCoreThreadTimeOut(true);
         return rc;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java Wed Aug  8 11:56:59 2007
@@ -17,90 +17,90 @@
 package org.apache.activemq.thread;
 
 /**
- * A Valve is a synchronization object used enable or disable the "flow" of concurrent
- * processing.
+ * A Valve is a synchronization object used enable or disable the "flow" of
+ * concurrent processing.
  * 
- *  
  * @version $Revision: 1.2 $
  */
 final public class Valve {
-    
+
     private final Object mutex = new Object();
     private boolean on;
-    private int turningOff=0;
-    private int usage=0;
+    private int turningOff = 0;
+    private int usage = 0;
 
     public Valve(boolean on) {
-        this.on = on;        
+        this.on = on;
     }
-    
+
     /**
-     * Turns the valve on.  This method blocks until the valve is off.
-     * @throws InterruptedException 
+     * Turns the valve on. This method blocks until the valve is off.
+     * 
+     * @throws InterruptedException
      */
     public void turnOn() throws InterruptedException {
-        synchronized(mutex) {
-            while( on ) {
+        synchronized (mutex) {
+            while (on) {
                 mutex.wait();
             }
-            on=true;
+            on = true;
             mutex.notifyAll();
-        }        
+        }
     }
 
     boolean isOn() {
-        synchronized(mutex) {
+        synchronized (mutex) {
             return on;
         }
     }
-    
+
     /**
-     * Turns the valve off.  This method blocks until the valve is on and the valve is not 
-     * in use.
-     *  
+     * Turns the valve off. This method blocks until the valve is on and the
+     * valve is not in use.
+     * 
      * @throws InterruptedException
      */
     public void turnOff() throws InterruptedException {
-         synchronized(mutex) {
-             try {
-                 ++turningOff;
-                 while( usage > 0 || !on) {
-                     mutex.wait();
-                 }
-                 on=false;
-                 mutex.notifyAll();
-             } finally {
-                 --turningOff;
-             }
-         }        
+        synchronized (mutex) {
+            try {
+                ++turningOff;
+                while (usage > 0 || !on) {
+                    mutex.wait();
+                }
+                on = false;
+                mutex.notifyAll();
+            } finally {
+                --turningOff;
+            }
+        }
     }
-    
+
     /**
-     * Increments the use counter of the valve.  This method blocks if the valve is off,
-     * or is being turned off.
+     * Increments the use counter of the valve. This method blocks if the valve
+     * is off, or is being turned off.
      * 
      * @throws InterruptedException
      */
     public void increment() throws InterruptedException {
-        synchronized(mutex) {
+        synchronized (mutex) {
             // Do we have to wait for the value to be on?
-            while( turningOff>0 || !on ) {
+            while (turningOff > 0 || !on) {
                 mutex.wait();
             }
             usage++;
-        }        
+        }
     }
-    
+
     /**
      * Decrements the use counter of the valve.
      */
     public void decrement() {
-        synchronized(mutex) {
+        synchronized (mutex) {
             usage--;
-            if( turningOff>0 && usage < 1 ) {
+            if (turningOff > 0 && usage < 1) {
                 mutex.notifyAll();
             }
-        }        
+        }
     }
-    
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Synchronization.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Synchronization.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Synchronization.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Synchronization.java Wed Aug  8 11:56:59 2007
@@ -17,13 +17,17 @@
 package org.apache.activemq.transaction;
 
 /**
- * 
  * @version $Revision$
  */
 public class Synchronization {
 
-    public void beforeEnd() throws Exception{}
-    public void afterCommit() throws Exception{}
-    public void afterRollback() throws Exception{}
-    
+    public void beforeEnd() throws Exception {
+    }
+
+    public void afterCommit() throws Exception {
+    }
+
+    public void afterRollback() throws Exception {
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java Wed Aug  8 11:56:59 2007
@@ -25,16 +25,16 @@
 import org.apache.activemq.command.TransactionId;
 
 /**
- * Keeps track of all the actions the need to be done when
- * a transaction does a commit or rollback.
- *
+ * Keeps track of all the actions the need to be done when a transaction does a
+ * commit or rollback.
+ * 
  * @version $Revision: 1.5 $
  */
 public abstract class Transaction {
 
-    static final public byte START_STATE = 0;      // can go to: 1,2,3
-    static final public byte IN_USE_STATE = 1;     // can go to: 2,3
-    static final public byte PREPARED_STATE = 2;   // can go to: 3
+    static final public byte START_STATE = 0; // can go to: 1,2,3
+    static final public byte IN_USE_STATE = 1; // can go to: 2,3
+    static final public byte PREPARED_STATE = 2; // can go to: 3
     static final public byte FINISHED_STATE = 3;
 
     private ArrayList synchronizations = new ArrayList();
@@ -54,7 +54,7 @@
             state = IN_USE_STATE;
         }
     }
-    
+
     public void removeSynchronization(Synchronization r) {
         synchronizations.remove(r);
     }
@@ -64,47 +64,49 @@
         // Is it ok to call prepare now given the state of the
         // transaction?
         switch (state) {
-            case START_STATE:
-            case IN_USE_STATE:
-                break;
-            default:
-                XAException xae = new XAException("Prepare cannot be called now.");
-                xae.errorCode = XAException.XAER_PROTO;
-                throw xae;
+        case START_STATE:
+        case IN_USE_STATE:
+            break;
+        default:
+            XAException xae = new XAException("Prepare cannot be called now.");
+            xae.errorCode = XAException.XAER_PROTO;
+            throw xae;
         }
 
-//        // Run the prePrepareTasks
-//        for (Iterator iter = prePrepareTasks.iterator(); iter.hasNext();) {
-//            Callback r = (Callback) iter.next();
-//            r.execute();
-//        }
+        // // Run the prePrepareTasks
+        // for (Iterator iter = prePrepareTasks.iterator(); iter.hasNext();) {
+        // Callback r = (Callback) iter.next();
+        // r.execute();
+        // }
     }
 
     protected void fireAfterCommit() throws Exception {
         for (Iterator iter = synchronizations.iterator(); iter.hasNext();) {
-            Synchronization s = (Synchronization) iter.next();
+            Synchronization s = (Synchronization)iter.next();
             s.afterCommit();
         }
     }
 
     public void fireAfterRollback() throws Exception {
         for (Iterator iter = synchronizations.iterator(); iter.hasNext();) {
-            Synchronization s = (Synchronization) iter.next();
+            Synchronization s = (Synchronization)iter.next();
             s.afterRollback();
         }
     }
 
     public String toString() {
-        return super.toString() + "[synchronizations=" + synchronizations +"]";
+        return super.toString() + "[synchronizations=" + synchronizations + "]";
     }
 
     abstract public void commit(boolean onePhase) throws XAException, IOException;
+
     abstract public void rollback() throws XAException, IOException;
+
     abstract public int prepare() throws XAException, IOException;
-    
+
     abstract public TransactionId getTransactionId();
 
     public boolean isPrepared() {
-        return getState()==PREPARED_STATE;
+        return getState() == PREPARED_STATE;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java Wed Aug  8 11:56:59 2007
@@ -32,9 +32,9 @@
  * @version $Revision: 1.4 $
  */
 public class XATransaction extends Transaction {
-    
+
     private static final Log log = LogFactory.getLog(XATransaction.class);
-    
+
     private final TransactionStore transactionStore;
     private final XATransactionId xid;
     private final TransactionBroker broker;
@@ -46,32 +46,32 @@
     }
 
     public void commit(boolean onePhase) throws XAException, IOException {
-    	if(log.isDebugEnabled())
-    		log.debug("XA Transaction commit: "+xid);
+        if (log.isDebugEnabled())
+            log.debug("XA Transaction commit: " + xid);
 
         switch (getState()) {
-            case START_STATE:
-                // 1 phase commit, no work done.
-                checkForPreparedState(onePhase);
-                setStateFinished();
-                break;
-            case IN_USE_STATE:
-                // 1 phase commit, work done.
-                checkForPreparedState(onePhase);
-                doPrePrepare();
-                setStateFinished();
-                transactionStore.commit(getTransactionId(), false);
-                doPostCommit();
-                break;
-            case PREPARED_STATE:
-                // 2 phase commit, work done.
-                // We would record commit here.
-                setStateFinished();
-                transactionStore.commit(getTransactionId(), true);
-                doPostCommit();
-                break;
-            default:
-                illegalStateTransition("commit");
+        case START_STATE:
+            // 1 phase commit, no work done.
+            checkForPreparedState(onePhase);
+            setStateFinished();
+            break;
+        case IN_USE_STATE:
+            // 1 phase commit, work done.
+            checkForPreparedState(onePhase);
+            doPrePrepare();
+            setStateFinished();
+            transactionStore.commit(getTransactionId(), false);
+            doPostCommit();
+            break;
+        case PREPARED_STATE:
+            // 2 phase commit, work done.
+            // We would record commit here.
+            setStateFinished();
+            transactionStore.commit(getTransactionId(), true);
+            doPostCommit();
+            break;
+        default:
+            illegalStateTransition("commit");
         }
     }
 
@@ -107,9 +107,8 @@
     private void doPostCommit() throws XAException {
         try {
             fireAfterCommit();
-        }
-        catch (Throwable e) {
-            // I guess this could happen.  Post commit task failed
+        } catch (Throwable e) {
+            // I guess this could happen. Post commit task failed
             // to execute properly.
             log.warn("POST COMMIT FAILED: ", e);
             XAException xae = new XAException("POST COMMIT FAILED");
@@ -120,27 +119,27 @@
     }
 
     public void rollback() throws XAException, IOException {
-    	
-    	if(log.isDebugEnabled())
-    		log.debug("XA Transaction rollback: "+xid);
+
+        if (log.isDebugEnabled())
+            log.debug("XA Transaction rollback: " + xid);
 
         switch (getState()) {
-            case START_STATE:
-                // 1 phase rollback no work done.
-                setStateFinished();
-                break;
-            case IN_USE_STATE:
-                // 1 phase rollback work done.
-                setStateFinished();
-                transactionStore.rollback(getTransactionId());
-                doPostRollback();
-                break;
-            case PREPARED_STATE:
-                // 2 phase rollback work done.
-                setStateFinished();
-                transactionStore.rollback(getTransactionId());
-                doPostRollback();
-                break;
+        case START_STATE:
+            // 1 phase rollback no work done.
+            setStateFinished();
+            break;
+        case IN_USE_STATE:
+            // 1 phase rollback work done.
+            setStateFinished();
+            transactionStore.rollback(getTransactionId());
+            doPostRollback();
+            break;
+        case PREPARED_STATE:
+            // 2 phase rollback work done.
+            setStateFinished();
+            transactionStore.rollback(getTransactionId());
+            doPostRollback();
+            break;
         }
 
     }
@@ -148,9 +147,8 @@
     private void doPostRollback() throws XAException {
         try {
             fireAfterRollback();
-        }
-        catch (Throwable e) {
-            // I guess this could happen.  Post commit task failed
+        } catch (Throwable e) {
+            // I guess this could happen. Post commit task failed
             // to execute properly.
             log.warn("POST ROLLBACK FAILED: ", e);
             XAException xae = new XAException("POST ROLLBACK FAILED");
@@ -161,23 +159,23 @@
     }
 
     public int prepare() throws XAException, IOException {
-    	if(log.isDebugEnabled())
-    		log.debug("XA Transaction prepare: "+xid);
-    	
+        if (log.isDebugEnabled())
+            log.debug("XA Transaction prepare: " + xid);
+
         switch (getState()) {
-            case START_STATE:
-                // No work done.. no commit/rollback needed.
-                setStateFinished();
-                return XAResource.XA_RDONLY;
-            case IN_USE_STATE:
-                // We would record prepare here.
-                doPrePrepare();
-                setState(Transaction.PREPARED_STATE);
-                transactionStore.prepare(getTransactionId());
-                return XAResource.XA_OK;
-            default :
-                illegalStateTransition("prepare");
-                return XAResource.XA_RDONLY;
+        case START_STATE:
+            // No work done.. no commit/rollback needed.
+            setStateFinished();
+            return XAResource.XA_RDONLY;
+        case IN_USE_STATE:
+            // We would record prepare here.
+            doPrePrepare();
+            setState(Transaction.PREPARED_STATE);
+            transactionStore.prepare(getTransactionId());
+            return XAResource.XA_OK;
+        default:
+            illegalStateTransition("prepare");
+            return XAResource.XA_RDONLY;
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java Wed Aug  8 11:56:59 2007
@@ -28,19 +28,18 @@
 
 public class FutureResponse {
     private static final Log log = LogFactory.getLog(FutureResponse.class);
-           
+
     private final ResponseCallback responseCallback;
     private final ArrayBlockingQueue responseSlot = new ArrayBlockingQueue(1);
-    
+
     public FutureResponse(ResponseCallback responseCallback) {
         this.responseCallback = responseCallback;
     }
 
     public Response getResult() throws IOException {
         try {
-            return (Response) responseSlot.take();
-        }
-        catch (InterruptedException e) {
+            return (Response)responseSlot.take();
+        } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             if (log.isDebugEnabled()) {
                 log.debug("Operation interupted: " + e, e);
@@ -48,20 +47,20 @@
             throw new InterruptedIOException("Interrupted.");
         }
     }
-    
+
     public Response getResult(int timeout) throws IOException {
         try {
-            return (Response) responseSlot.poll(timeout,TimeUnit.MILLISECONDS);
+            return (Response)responseSlot.poll(timeout, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             throw new InterruptedIOException("Interrupted.");
         }
     }
-    
+
     public void set(Response result) {
-        if( responseSlot.offer(result) ) {
-            if( responseCallback !=null ) {
+        if (responseSlot.offer(result)) {
+            if (responseCallback != null) {
                 responseCallback.onCompletion(this);
-            }        
+            }
         }
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Wed Aug  8 11:56:59 2007
@@ -26,23 +26,24 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * Used to make sure that commands are arriving periodically from the peer of the transport.
- *
+ * Used to make sure that commands are arriving periodically from the peer of
+ * the transport.
+ * 
  * @version $Revision$
  */
 public class InactivityMonitor extends TransportFilter {
 
-    private final Log log = LogFactory.getLog(InactivityMonitor.class);
+    private final Log LOG = LogFactory.getLog(InactivityMonitor.class);
 
     private WireFormatInfo localWireFormatInfo;
     private WireFormatInfo remoteWireFormatInfo;
-    private final AtomicBoolean monitorStarted= new AtomicBoolean(false);
+    private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
 
-    private final AtomicBoolean commandSent=new AtomicBoolean(false);
-    private final AtomicBoolean inSend=new AtomicBoolean(false);
+    private final AtomicBoolean commandSent = new AtomicBoolean(false);
+    private final AtomicBoolean inSend = new AtomicBoolean(false);
 
-    private final AtomicBoolean commandReceived=new AtomicBoolean(true);
-    private final AtomicBoolean inReceive=new AtomicBoolean(false);
+    private final AtomicBoolean commandReceived = new AtomicBoolean(true);
+    private final AtomicBoolean inReceive = new AtomicBoolean(false);
 
     private final Runnable readChecker = new Runnable() {
         public void run() {
@@ -56,7 +57,6 @@
         }
     };
 
-
     public InactivityMonitor(Transport next) {
         super(next);
     }
@@ -66,23 +66,22 @@
         next.stop();
     }
 
-
     final void writeCheck() {
-        synchronized(writeChecker) {
-            if( inSend.get() ) {
-                log.trace("A send is in progress");
+        synchronized (writeChecker) {
+            if (inSend.get()) {
+                LOG.trace("A send is in progress");
                 return;
             }
 
-            if( !commandSent.get() ) {
-                log.trace("No message sent since last write check, sending a KeepAliveInfo");
+            if (!commandSent.get()) {
+                LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
                 try {
                     next.oneway(new KeepAliveInfo());
                 } catch (IOException e) {
                     onException(e);
                 }
             } else {
-                log.trace("Message sent since last write check, resetting flag");
+                LOG.trace("Message sent since last write check, resetting flag");
             }
 
             commandSent.set(false);
@@ -90,17 +89,17 @@
     }
 
     final void readCheck() {
-        synchronized(readChecker) {
-            if( inReceive.get() ) {
-                log.trace("A receive is in progress");
+        synchronized (readChecker) {
+            if (inReceive.get()) {
+                LOG.trace("A receive is in progress");
                 return;
             }
 
-            if( !commandReceived.get() ) {
-                log.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
+            if (!commandReceived.get()) {
+                LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
                 onException(new InactivityIOException("Channel was inactive for too long."));
             } else {
-                log.trace("Message received since last read check, resetting flag: ");
+                LOG.trace("Message received since last read check, resetting flag: ");
             }
 
             commandReceived.set(false);
@@ -109,12 +108,12 @@
     }
 
     public void onCommand(Object command) {
-        synchronized(readChecker) {
+        synchronized (readChecker) {
             inReceive.set(true);
             try {
-                if( command.getClass() == WireFormatInfo.class ) {
-                    synchronized( this ) {
-                        remoteWireFormatInfo = (WireFormatInfo) command;
+                if (command.getClass() == WireFormatInfo.class) {
+                    synchronized (this) {
+                        remoteWireFormatInfo = (WireFormatInfo)command;
                         try {
                             startMonitorThreads();
                         } catch (IOException e) {
@@ -130,16 +129,15 @@
         }
     }
 
-
     public void oneway(Object o) throws IOException {
-        synchronized(writeChecker) {
+        synchronized (writeChecker) {
             // Disable inactivity monitoring while processing a command.
             inSend.set(true);
             commandSent.set(true);
             try {
-                if( o.getClass() == WireFormatInfo.class ) {
-                    synchronized( this ) {
-                        localWireFormatInfo = (WireFormatInfo) o;
+                if (o.getClass() == WireFormatInfo.class) {
+                    synchronized (this) {
+                        localWireFormatInfo = (WireFormatInfo)o;
                         startMonitorThreads();
                     }
                 }
@@ -151,25 +149,24 @@
     }
 
     public void onException(IOException error) {
-    	if( monitorStarted.get() ) {
-	        stopMonitorThreads();
-    	}
+        if (monitorStarted.get()) {
+            stopMonitorThreads();
+        }
         getTransportListener().onException(error);
     }
 
-
     synchronized private void startMonitorThreads() throws IOException {
-        if( monitorStarted.get() )
+        if (monitorStarted.get())
             return;
-        if( localWireFormatInfo == null )
+        if (localWireFormatInfo == null)
             return;
-        if( remoteWireFormatInfo == null )
+        if (remoteWireFormatInfo == null)
             return;
 
         long l = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
-        if( l > 0 ) {
+        if (l > 0) {
             monitorStarted.set(true);
-            Scheduler.executePeriodically(writeChecker, l/2);
+            Scheduler.executePeriodically(writeChecker, l / 2);
             Scheduler.executePeriodically(readChecker, l);
         }
     }
@@ -178,11 +175,10 @@
      *
      */
     synchronized private void stopMonitorThreads() {
-        if( monitorStarted.compareAndSet(true, false) ) {
+        if (monitorStarted.compareAndSet(true, false)) {
             Scheduler.cancel(readChecker);
             Scheduler.cancel(writeChecker);
         }
     }
-
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java Wed Aug  8 11:56:59 2007
@@ -19,45 +19,43 @@
 import java.io.IOException;
 import org.apache.activemq.command.ShutdownInfo;
 
-
-
 /**
  * @version $Revision$
  */
 public class MutexTransport extends TransportFilter {
 
     private final Object writeMutex = new Object();
-    
+
     public MutexTransport(Transport next) {
         super(next);
     }
 
     public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
-        synchronized(writeMutex) {
+        synchronized (writeMutex) {
             return next.asyncRequest(command, null);
         }
     }
 
-    public void oneway(Object command) throws IOException{
-        synchronized(writeMutex){
+    public void oneway(Object command) throws IOException {
+        synchronized (writeMutex) {
             next.oneway(command);
         }
     }
 
     public Object request(Object command) throws IOException {
-        synchronized(writeMutex) {
+        synchronized (writeMutex) {
             return next.request(command);
         }
     }
-    
-    public Object request(Object command,int timeout) throws IOException {
-        synchronized(writeMutex){
-            return next.request(command,timeout);
+
+    public Object request(Object command, int timeout) throws IOException {
+        synchronized (writeMutex) {
+            return next.request(command, timeout);
         }
     }
-    
+
     public String toString() {
         return next.toString();
     }
-    
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java Wed Aug  8 11:56:59 2007
@@ -28,95 +28,96 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Adds the incrementing sequence number to commands along with performing the corelation of responses to requests to
- * create a blocking request-response semantics.
+ * Adds the incrementing sequence number to commands along with performing the
+ * corelation of responses to requests to create a blocking request-response
+ * semantics.
  * 
  * @version $Revision: 1.4 $
  */
-public class ResponseCorrelator extends TransportFilter{
+public class ResponseCorrelator extends TransportFilter {
 
-    private static final Log log=LogFactory.getLog(ResponseCorrelator.class);
-    private final Map requestMap=new HashMap();
+    private static final Log LOG = LogFactory.getLog(ResponseCorrelator.class);
+    private final Map requestMap = new HashMap();
     private IntSequenceGenerator sequenceGenerator;
-    private final boolean debug=log.isDebugEnabled();
+    private final boolean debug = LOG.isDebugEnabled();
 
-    public ResponseCorrelator(Transport next){
-        this(next,new IntSequenceGenerator());
+    public ResponseCorrelator(Transport next) {
+        this(next, new IntSequenceGenerator());
     }
 
-    public ResponseCorrelator(Transport next,IntSequenceGenerator sequenceGenerator){
+    public ResponseCorrelator(Transport next, IntSequenceGenerator sequenceGenerator) {
         super(next);
-        this.sequenceGenerator=sequenceGenerator;
+        this.sequenceGenerator = sequenceGenerator;
     }
 
-    public void oneway(Object o) throws IOException{
-        Command command=(Command)o;
+    public void oneway(Object o) throws IOException {
+        Command command = (Command)o;
         command.setCommandId(sequenceGenerator.getNextSequenceId());
         command.setResponseRequired(false);
         next.oneway(command);
     }
 
-    public FutureResponse asyncRequest(Object o,ResponseCallback responseCallback) throws IOException{
-        Command command=(Command)o;
+    public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
+        Command command = (Command)o;
         command.setCommandId(sequenceGenerator.getNextSequenceId());
         command.setResponseRequired(true);
-        FutureResponse future=new FutureResponse(responseCallback);
-        synchronized(requestMap){
-            requestMap.put(new Integer(command.getCommandId()),future);
+        FutureResponse future = new FutureResponse(responseCallback);
+        synchronized (requestMap) {
+            requestMap.put(new Integer(command.getCommandId()), future);
         }
         next.oneway(command);
         return future;
     }
 
-    public Object request(Object command) throws IOException{
-        FutureResponse response=asyncRequest(command,null);
+    public Object request(Object command) throws IOException {
+        FutureResponse response = asyncRequest(command, null);
         return response.getResult();
     }
 
-    public Object request(Object command,int timeout) throws IOException{
-        FutureResponse response=asyncRequest(command,null);
+    public Object request(Object command, int timeout) throws IOException {
+        FutureResponse response = asyncRequest(command, null);
         return response.getResult(timeout);
     }
 
-    public void onCommand(Object o){
-        Command command=(Command)o;
-        if(command.isResponse()){
-            Response response=(Response)command;
-            FutureResponse future=null;
-            synchronized(requestMap){
-                future=(FutureResponse)requestMap.remove(Integer.valueOf(response.getCorrelationId()));
+    public void onCommand(Object o) {
+        Command command = (Command)o;
+        if (command.isResponse()) {
+            Response response = (Response)command;
+            FutureResponse future = null;
+            synchronized (requestMap) {
+                future = (FutureResponse)requestMap.remove(Integer.valueOf(response.getCorrelationId()));
             }
-            if(future!=null){
+            if (future != null) {
                 future.set(response);
-            }else{
-                if(debug)
-                    log.debug("Received unexpected response for command id: "+response.getCorrelationId());
+            } else {
+                if (debug)
+                    LOG.debug("Received unexpected response for command id: " + response.getCorrelationId());
             }
-        }else{
+        } else {
             getTransportListener().onCommand(command);
         }
     }
 
     /**
-     * If an async exception occurs, then assume no responses will arrive for any of current requests. Lets let them
-     * know of the problem.
+     * If an async exception occurs, then assume no responses will arrive for
+     * any of current requests. Lets let them know of the problem.
      */
-    public void onException(IOException error){
+    public void onException(IOException error) {
         // Copy and Clear the request Map
-        ArrayList requests=new ArrayList(requestMap.values());
+        ArrayList requests = new ArrayList(requestMap.values());
         requestMap.clear();
-        for(Iterator iter=requests.iterator();iter.hasNext();){
-            FutureResponse fr=(FutureResponse)iter.next();
+        for (Iterator iter = requests.iterator(); iter.hasNext();) {
+            FutureResponse fr = (FutureResponse)iter.next();
             fr.set(new ExceptionResponse(error));
         }
         super.onException(error);
     }
 
-    public IntSequenceGenerator getSequenceGenerator(){
+    public IntSequenceGenerator getSequenceGenerator() {
         return sequenceGenerator;
     }
 
-    public String toString(){
+    public String toString() {
         return next.toString();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java Wed Aug  8 11:56:59 2007
@@ -16,103 +16,108 @@
  */
 package org.apache.activemq.transport;
 
-import org.apache.activemq.Service;
-
 import java.io.IOException;
 
+import org.apache.activemq.Service;
+
 /**
- * Represents the client side of a transport allowing messages
- * to be sent synchronously, asynchronously and consumed.
- *
+ * Represents the client side of a transport allowing messages to be sent
+ * synchronously, asynchronously and consumed.
+ * 
  * @version $Revision: 1.5 $
  */
 public interface Transport extends Service {
 
     /**
      * A one way asynchronous send
-     * @param command 
-     * @throws IOException 
+     * 
+     * @param command
+     * @throws IOException
      */
     public void oneway(Object command) throws IOException;
 
     /**
-     * An asynchronous request response where the Receipt will be returned
-     * in the future.  If responseCallback is not null, then it will be called
-     * when the response has been completed.
+     * An asynchronous request response where the Receipt will be returned in
+     * the future. If responseCallback is not null, then it will be called when
+     * the response has been completed.
      * 
-     * @param command 
+     * @param command
      * @param responseCallback TODO
      * @return the FutureResponse
-     * @throws IOException 
+     * @throws IOException
      */
     public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException;
-    
+
     /**
      * A synchronous request response
-     * @param command 
+     * 
+     * @param command
      * @return the response
-     * @throws IOException 
+     * @throws IOException
      */
     public Object request(Object command) throws IOException;
 
     /**
      * A synchronous request response
-     * @param command 
-     * @param timeout 
+     * 
+     * @param command
+     * @param timeout
      * @return the repsonse or null if timeout
-     * @throws IOException 
+     * @throws IOException
      */
     public Object request(Object command, int timeout) throws IOException;
 
-    
-//    /**
-//     * A one way asynchronous send
-//     * @param command 
-//     * @throws IOException 
-//     */
-//    public void oneway(Command command) throws IOException;
-//
-//    /**
-//     * An asynchronous request response where the Receipt will be returned
-//     * in the future.  If responseCallback is not null, then it will be called
-//     * when the response has been completed.
-//     * 
-//     * @param command 
-//     * @param responseCallback TODO
-//     * @return the FutureResponse
-//     * @throws IOException 
-//     */
-//    public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException;
-//    
-//    /**
-//     * A synchronous request response
-//     * @param command 
-//     * @return the response
-//     * @throws IOException 
-//     */
-//    public Response request(Command command) throws IOException;
-//
-//    /**
-//     * A synchronous request response
-//     * @param command 
-//     * @param timeout 
-//     * @return the repsonse or null if timeout
-//     * @throws IOException 
-//     */
-//    public Response request(Command command, int timeout) throws IOException;
-    
+    // /**
+    // * A one way asynchronous send
+    // * @param command
+    // * @throws IOException
+    // */
+    // public void oneway(Command command) throws IOException;
+    //
+    // /**
+    // * An asynchronous request response where the Receipt will be returned
+    // * in the future. If responseCallback is not null, then it will be called
+    // * when the response has been completed.
+    // *
+    // * @param command
+    // * @param responseCallback TODO
+    // * @return the FutureResponse
+    // * @throws IOException
+    // */
+    // public FutureResponse asyncRequest(Command command, ResponseCallback
+    // responseCallback) throws IOException;
+    //    
+    // /**
+    // * A synchronous request response
+    // * @param command
+    // * @return the response
+    // * @throws IOException
+    // */
+    // public Response request(Command command) throws IOException;
+    //
+    // /**
+    // * A synchronous request response
+    // * @param command
+    // * @param timeout
+    // * @return the repsonse or null if timeout
+    // * @throws IOException
+    // */
+    // public Response request(Command command, int timeout) throws IOException;
+
     /**
      * Returns the current transport listener
-     * @return 
+     * 
+     * @return
      */
     public TransportListener getTransportListener();
 
     /**
      * Registers an inbound command listener
-     * @param commandListener 
+     * 
+     * @param commandListener
      */
     public void setTransportListener(TransportListener commandListener);
-    
+
     /**
      * @param target
      * @return the target
@@ -121,12 +126,12 @@
 
     /**
      * @return the remote address for this connection
-     *  
      */
-	public String getRemoteAddress();
-    
+    public String getRemoteAddress();
+
     /**
      * Indicates if the transport can handle faults
+     * 
      * @return tru if fault tolerant
      */
     public boolean isFaultTolerant();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java Wed Aug  8 11:56:59 2007
@@ -25,7 +25,7 @@
  */
 public class TransportDisposedIOException extends IOException {
 
-    private static final long serialVersionUID=-7107323414439622596L;
+    private static final long serialVersionUID = -7107323414439622596L;
 
     public TransportDisposedIOException() {
         super();
@@ -37,6 +37,5 @@
     public TransportDisposedIOException(String message) {
         super(message);
     }
-
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java Wed Aug  8 11:56:59 2007
@@ -118,8 +118,7 @@
                 throw new IllegalArgumentException("Invalid connect parameters: " + options);
             }
             return rc;
-        }
-        catch (URISyntaxException e) {
+        } catch (URISyntaxException e) {
             throw IOExceptionSupport.create(e);
         }
     }
@@ -135,16 +134,16 @@
             }
             return rc;
 
-        }
-        catch (URISyntaxException e) {
+        } catch (URISyntaxException e) {
             throw IOExceptionSupport.create(e);
         }
     }
 
     /**
      * Factory method to create a new transport
-     * @throws IOException 
-     * @throws UnknownHostException 
+     * 
+     * @throws IOException
+     * @throws UnknownHostException
      */
     protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException, UnknownHostException, IOException {
         throw new IOException("createTransport() method not implemented!");
@@ -157,16 +156,15 @@
      */
     private static TransportFactory findTransportFactory(URI location) throws IOException {
         String scheme = location.getScheme();
-        if( scheme == null )
+        if (scheme == null)
             throw new IOException("Transport not scheme specified: [" + location + "]");
-        TransportFactory tf = (TransportFactory) transportFactorys.get(scheme);
+        TransportFactory tf = (TransportFactory)transportFactorys.get(scheme);
         if (tf == null) {
             // Try to load if from a META-INF property.
             try {
-                tf = (TransportFactory) transportFactoryFinder.newInstance(scheme);
+                tf = (TransportFactory)transportFactoryFinder.newInstance(scheme);
                 transportFactorys.put(scheme, tf);
-            }
-            catch (Throwable e) {
+            } catch (Throwable e) {
                 throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
             }
         }
@@ -180,16 +178,15 @@
     }
 
     protected WireFormatFactory createWireFormatFactory(Map options) throws IOException {
-        String wireFormat = (String) options.get("wireFormat");
+        String wireFormat = (String)options.get("wireFormat");
         if (wireFormat == null)
             wireFormat = getDefaultWireFormatType();
 
         try {
-            WireFormatFactory wff = (WireFormatFactory) wireFormatFactoryFinder.newInstance(wireFormat);
+            WireFormatFactory wff = (WireFormatFactory)wireFormatFactoryFinder.newInstance(wireFormat);
             IntrospectionSupport.setProperties(wff, options, "wireFormat.");
             return wff;
-        }
-        catch (Throwable e) {
+        } catch (Throwable e) {
             throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
         }
     }
@@ -199,8 +196,8 @@
     }
 
     /**
-     * Fully configures and adds all need transport filters so that the transport
-     * can be used by the JMS client.
+     * Fully configures and adds all need transport filters so that the
+     * transport can be used by the JMS client.
      * 
      * @param transport
      * @param wf
@@ -209,19 +206,19 @@
      * @throws Exception
      */
     public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
-    	transport = compositeConfigure(transport, wf, options);
-    	
+        transport = compositeConfigure(transport, wf, options);
+
         transport = new MutexTransport(transport);
         transport = new ResponseCorrelator(transport);
-        
+
         return transport;
     }
 
     /**
-     * Fully configures and adds all need transport filters so that the transport
-     * can be used by the ActiveMQ message broker.  The main difference between this and the 
-     * configure() method is that the broker does not issue requests to the client so the
-     * ResponseCorrelator is not needed.
+     * Fully configures and adds all need transport filters so that the
+     * transport can be used by the ActiveMQ message broker. The main difference
+     * between this and the configure() method is that the broker does not issue
+     * requests to the client so the ResponseCorrelator is not needed.
      * 
      * @param transport
      * @param format
@@ -229,15 +226,16 @@
      * @return
      * @throws Exception
      */
-	public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
-    	transport = compositeConfigure(transport, format, options);    	
+    public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
+        transport = compositeConfigure(transport, format, options);
         transport = new MutexTransport(transport);
         return transport;
-	}
-    
+    }
+
     /**
-     * Similar to configure(...) but this avoid adding in the MutexTransport and ResponseCorrelator transport layers
-     * so that the resulting transport can more efficiently be used as part of a composite transport.
+     * Similar to configure(...) but this avoid adding in the MutexTransport and
+     * ResponseCorrelator transport layers so that the resulting transport can
+     * more efficiently be used as part of a composite transport.
      * 
      * @param transport
      * @param format

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java Wed Aug  8 11:56:59 2007
@@ -17,24 +17,25 @@
 package org.apache.activemq.transport;
 
 import java.io.IOException;
+
 /**
  * @version $Revision: 1.5 $
  */
-public class TransportFilter implements TransportListener,Transport{
+public class TransportFilter implements TransportListener, Transport {
     final protected Transport next;
     protected TransportListener transportListener;
 
-    public TransportFilter(Transport next){
-        this.next=next;
+    public TransportFilter(Transport next) {
+        this.next = next;
     }
 
-    public TransportListener getTransportListener(){
+    public TransportListener getTransportListener() {
         return transportListener;
     }
 
-    public void setTransportListener(TransportListener channelListener){
-        this.transportListener=channelListener;
-        if(channelListener==null)
+    public void setTransportListener(TransportListener channelListener) {
+        this.transportListener = channelListener;
+        if (channelListener == null)
             next.setTransportListener(null);
         else
             next.setTransportListener(this);
@@ -42,13 +43,12 @@
 
     /**
      * @see org.apache.activemq.Service#start()
-     * @throws IOException
-     *             if the next channel has not been set.
+     * @throws IOException if the next channel has not been set.
      */
-    public void start() throws Exception{
-        if(next==null)
+    public void start() throws Exception {
+        if (next == null)
             throw new IOException("The next channel has not been set.");
-        if(transportListener==null)
+        if (transportListener == null)
             throw new IOException("The command listener has not been set.");
         next.start();
     }
@@ -56,69 +56,69 @@
     /**
      * @see org.apache.activemq.Service#stop()
      */
-    public void stop() throws Exception{
+    public void stop() throws Exception {
         next.stop();
     }
 
-    public void onCommand(Object command){
+    public void onCommand(Object command) {
         transportListener.onCommand(command);
     }
 
     /**
      * @return Returns the next.
      */
-    public Transport getNext(){
+    public Transport getNext() {
         return next;
     }
 
-    public String toString(){
+    public String toString() {
         return next.toString();
     }
 
-    public void oneway(Object command) throws IOException{
+    public void oneway(Object command) throws IOException {
         next.oneway(command);
     }
 
-    public FutureResponse asyncRequest(Object command,ResponseCallback responseCallback) throws IOException{
-        return next.asyncRequest(command,null);
+    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
+        return next.asyncRequest(command, null);
     }
 
-    public Object request(Object command) throws IOException{
+    public Object request(Object command) throws IOException {
         return next.request(command);
     }
 
-    public Object request(Object command,int timeout) throws IOException{
-        return next.request(command,timeout);
+    public Object request(Object command, int timeout) throws IOException {
+        return next.request(command, timeout);
     }
 
-    public void onException(IOException error){
+    public void onException(IOException error) {
         transportListener.onException(error);
     }
 
-    public void transportInterupted(){
+    public void transportInterupted() {
         transportListener.transportInterupted();
     }
 
-    public void transportResumed(){
+    public void transportResumed() {
         transportListener.transportResumed();
     }
 
-    public Object narrow(Class target){
-        if(target.isAssignableFrom(getClass())){
+    public Object narrow(Class target) {
+        if (target.isAssignableFrom(getClass())) {
             return this;
         }
         return next.narrow(target);
     }
 
-	public String getRemoteAddress() {
-		return next.getRemoteAddress();
-	}
+    public String getRemoteAddress() {
+        return next.getRemoteAddress();
+    }
 
     /**
      * @return
      * @see org.apache.activemq.transport.Transport#isFaultTolerant()
      */
-    public boolean isFaultTolerant(){
+    public boolean isFaultTolerant() {
         return next.isFaultTolerant();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java Wed Aug  8 11:56:59 2007
@@ -21,19 +21,18 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-
 /**
  * @version $Revision$
  */
 public class TransportLogger extends TransportFilter {
 
-    private static int lastId=0;
+    private static int lastId;
     private final Log log;
-    
+
     public TransportLogger(Transport next) {
-        this( next, LogFactory.getLog(TransportLogger.class.getName()+".Connection:"+getNextId()));
+        this(next, LogFactory.getLog(TransportLogger.class.getName() + ".Connection:" + getNextId()));
     }
-    
+
     synchronized private static int getNextId() {
         return ++lastId;
     }
@@ -44,46 +43,46 @@
     }
 
     public Object request(Object command) throws IOException {
-        log.debug("SENDING REQUEST: "+command);
-    	Object rc = super.request(command);
-        log.debug("GOT RESPONSE: "+rc);
-    	return rc;
+        log.debug("SENDING REQUEST: " + command);
+        Object rc = super.request(command);
+        log.debug("GOT RESPONSE: " + rc);
+        return rc;
     }
-    
+
     public Object request(Object command, int timeout) throws IOException {
-        log.debug("SENDING REQUEST: "+command);
+        log.debug("SENDING REQUEST: " + command);
         Object rc = super.request(command, timeout);
-        log.debug("GOT RESPONSE: "+rc);
-    	return rc;
+        log.debug("GOT RESPONSE: " + rc);
+        return rc;
     }
-    
+
     public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
-        log.debug("SENDING ASNYC REQUEST: "+command);
-    	FutureResponse rc = next.asyncRequest(command, responseCallback);
-    	return rc;
+        log.debug("SENDING ASNYC REQUEST: " + command);
+        FutureResponse rc = next.asyncRequest(command, responseCallback);
+        return rc;
     }
-    
+
     public void oneway(Object command) throws IOException {
-        if( log.isDebugEnabled() ) {
-            log.debug("SENDING: "+command);
+        if (log.isDebugEnabled()) {
+            log.debug("SENDING: " + command);
         }
         next.oneway(command);
     }
-    
+
     public void onCommand(Object command) {
-        if( log.isDebugEnabled() ) {
+        if (log.isDebugEnabled()) {
             log.debug("RECEIVED: " + command);
         }
         getTransportListener().onCommand(command);
     }
-    
+
     public void onException(IOException error) {
-        if( log.isDebugEnabled() ) {
-            log.debug("RECEIVED Exception: "+error, error);
+        if (log.isDebugEnabled()) {
+            log.debug("RECEIVED Exception: " + error, error);
         }
         getTransportListener().onException(error);
     }
-    
+
     public String toString() {
         return next.toString();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java Wed Aug  8 11:56:59 2007
@@ -16,10 +16,10 @@
  */
 package org.apache.activemq.transport;
 
-import org.apache.activemq.util.ServiceSupport;
-
 import java.net.URI;
 
+import org.apache.activemq.util.ServiceSupport;
+
 /**
  * A useful base class for implementations of {@link TransportServer}
  * 
@@ -28,7 +28,7 @@
 public abstract class TransportServerSupport extends ServiceSupport implements TransportServer {
 
     private URI connectURI;
-	private URI bindLocation;
+    private URI bindLocation;
     private TransportAcceptListener acceptListener;
 
     public TransportServerSupport() {
@@ -63,8 +63,7 @@
     }
 
     /**
-     * @param location
-     *            The location to set.
+     * @param location The location to set.
      */
     public void setConnectURI(URI location) {
         this.connectURI = location;
@@ -76,11 +75,11 @@
         }
     }
 
-	public URI getBindLocation() {
-		return bindLocation;
-	}
-
-	public void setBindLocation(URI bindLocation) {
-		this.bindLocation = bindLocation;
-	}
+    public URI getBindLocation() {
+        return bindLocation;
+    }
+
+    public void setBindLocation(URI bindLocation) {
+        this.bindLocation = bindLocation;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java Wed Aug  8 11:56:59 2007
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.transport;
 
-
 import org.apache.activemq.ThreadPriorities;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.commons.logging.Log;
@@ -36,7 +35,8 @@
     private boolean daemon = true;
     private boolean joinOnStop = true;
     private Thread runner;
-    private long stackSize=0;//should be a multiple of 128k
+ // should be a multiple of 128k
+    private long stackSize = 0;
 
     public TransportServerThreadSupport() {
     }
@@ -56,13 +56,13 @@
         this.daemon = daemon;
     }
 
-    
     public boolean isJoinOnStop() {
         return joinOnStop;
     }
 
     /**
-     * Sets whether the background read thread is joined with (waited for) on a stop
+     * Sets whether the background read thread is joined with (waited for) on a
+     * stop
      */
     public void setJoinOnStop(boolean joinOnStop) {
         this.joinOnStop = joinOnStop;
@@ -70,7 +70,7 @@
 
     protected void doStart() throws Exception {
         log.info("Listening for connections at: " + getConnectURI());
-        runner = new Thread(null,this, "ActiveMQ Transport Server: "+toString(),stackSize);
+        runner = new Thread(null, this, "ActiveMQ Transport Server: " + toString(), stackSize);
         runner.setDaemon(daemon);
         runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);
         runner.start();
@@ -83,19 +83,17 @@
         }
     }
 
-    
     /**
      * @return the stackSize
      */
-    public long getStackSize(){
+    public long getStackSize() {
         return this.stackSize;
     }
 
-    
     /**
      * @param stackSize the stackSize to set
      */
-    public void setStackSize(long stackSize){
-        this.stackSize=stackSize;
+    public void setStackSize(long stackSize) {
+        this.stackSize = stackSize;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java Wed Aug  8 11:56:59 2007
@@ -70,8 +70,8 @@
     public Object request(Object command) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
-    
-    public Object request(Object command,int timeout) throws IOException {
+
+    public Object request(Object command, int timeout) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
 
@@ -82,8 +82,7 @@
         if (command != null) {
             if (transportListener != null) {
                 transportListener.onCommand(command);
-            }
-            else {
+            } else {
                 log.error("No transportListener available to process inbound command: " + command);
             }
         }
@@ -100,11 +99,11 @@
 
     protected void checkStarted() throws IOException {
         if (!isStarted()) {
-			throw new IOException("The transport is not running.");
+            throw new IOException("The transport is not running.");
         }
     }
-    
-    public boolean isFaultTolerant(){
+
+    public boolean isFaultTolerant() {
         return false;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java Wed Aug  8 11:56:59 2007
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.transport;
 
-
-
 /**
  * A useful base class for a transport implementation which has a background
  * reading thread.
@@ -28,7 +26,8 @@
 
     private boolean daemon = false;
     private Thread runner;
-    private long stackSize=0;//should be a multiple of 128k
+    // should be a multiple of 128k
+    private long stackSize = 0;
 
     public boolean isDaemon() {
         return daemon;
@@ -39,24 +38,22 @@
     }
 
     protected void doStart() throws Exception {
-        runner = new Thread(null,this, "ActiveMQ Transport: "+toString(),stackSize);
+        runner = new Thread(null, this, "ActiveMQ Transport: " + toString(), stackSize);
         runner.setDaemon(daemon);
         runner.start();
     }
 
-    
     /**
      * @return the stackSize
      */
-    public long getStackSize(){
+    public long getStackSize() {
         return this.stackSize;
     }
 
-    
     /**
      * @param stackSize the stackSize to set
      */
-    public void setStackSize(long stackSize){
-        this.stackSize=stackSize;
+    public void setStackSize(long stackSize) {
+        this.stackSize = stackSize;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java Wed Aug  8 11:56:59 2007
@@ -30,22 +30,21 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-
 /**
  * Negotiates the wire format with a new connection
  */
 public class WireFormatNegotiator extends TransportFilter {
 
-    private static final Log log = LogFactory.getLog(WireFormatNegotiator.class);
-    
+    private static final Log LOG = LogFactory.getLog(WireFormatNegotiator.class);
+
     private OpenWireFormat wireFormat;
     private final int minimumVersion;
-    private long negotiateTimeout=15000;
-    
-    private final AtomicBoolean firstStart=new AtomicBoolean(true);
+    private long negotiateTimeout = 15000L;
+
+    private final AtomicBoolean firstStart = new AtomicBoolean(true);
     private final CountDownLatch readyCountDownLatch = new CountDownLatch(1);
     private final CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1);
-    
+
     /**
      * Negotiator
      * 
@@ -60,31 +59,30 @@
         this.minimumVersion = minimumVersion;
     }
 
-    
     public void start() throws Exception {
         super.start();
-        if( firstStart.compareAndSet(true, false) ) {
-        	try {
-        		WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
-                if (log.isDebugEnabled()) {
-                    log.debug("Sending: " + info);
+        if (firstStart.compareAndSet(true, false)) {
+            try {
+                WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Sending: " + info);
                 }
-	            sendWireFormat(info);
-        	} finally {
-        		wireInfoSentDownLatch.countDown();
-        	}
+                sendWireFormat(info);
+            } finally {
+                wireInfoSentDownLatch.countDown();
+            }
         }
     }
-    
+
     public void stop() throws Exception {
-    	super.stop();
+        super.stop();
         readyCountDownLatch.countDown();
     }
-    
+
     public void oneway(Object command) throws IOException {
         try {
-            if( !readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS) ) 
-            	throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");
+            if (!readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS))
+                throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new InterruptedIOException();
@@ -92,60 +90,54 @@
         super.oneway(command);
     }
 
- 
     public void onCommand(Object o) {
-    	Command command = (Command) o;
-        if( command.isWireFormatInfo() ) {
-            WireFormatInfo info = (WireFormatInfo) command;
-            if (log.isDebugEnabled()) {
-                log.debug("Received WireFormat: " + info);
+        Command command = (Command)o;
+        if (command.isWireFormatInfo()) {
+            WireFormatInfo info = (WireFormatInfo)command;
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Received WireFormat: " + info);
             }
-            
+
             try {
                 wireInfoSentDownLatch.await();
-                
-                if (log.isDebugEnabled()) {
-                    log.debug(this + " before negotiation: " + wireFormat);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(this + " before negotiation: " + wireFormat);
                 }
-                if( !info.isValid() ) {
+                if (!info.isValid()) {
                     onException(new IOException("Remote wire format magic is invalid"));
-                } else if( info.getVersion() < minimumVersion ) {
-                    onException(new IOException("Remote wire format ("+info.getVersion()+") is lower the minimum version required ("+minimumVersion+")"));
+                } else if (info.getVersion() < minimumVersion) {
+                    onException(new IOException("Remote wire format (" + info.getVersion() + ") is lower the minimum version required (" + minimumVersion + ")"));
                 }
-                
+
                 wireFormat.renegotiateWireFormat(info);
-                
-                if (log.isDebugEnabled()) {
-                    log.debug(this + " after negotiation: " + wireFormat);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(this + " after negotiation: " + wireFormat);
                 }
-	
+
             } catch (IOException e) {
                 onException(e);
             } catch (InterruptedException e) {
-                onException((IOException) new InterruptedIOException().initCause(e));
+                onException((IOException)new InterruptedIOException().initCause(e));
             } catch (Exception e) {
                 onException(IOExceptionSupport.create(e));
-			}
+            }
             readyCountDownLatch.countDown();
             onWireFormatNegotiated(info);
         }
         getTransportListener().onCommand(command);
     }
 
-
     public void onException(IOException error) {
         readyCountDownLatch.countDown();
         /*
-        try {
-            super.oneway(new ExceptionResponse(error));
-        }
-        catch (IOException e) {
-            // ignore as we are already throwing an exception
-        }
-        */
+         * try { super.oneway(new ExceptionResponse(error)); } catch
+         * (IOException e) { // ignore as we are already throwing an exception }
+         */
         super.onException(error);
     }
-    
+
     public String toString() {
         return next.toString();
     }
@@ -153,16 +145,15 @@
     protected void sendWireFormat(WireFormatInfo info) throws IOException {
         next.oneway(info);
     }
-    
+
     protected void onWireFormatNegotiated(WireFormatInfo info) {
     }
 
+    public long getNegotiateTimeout() {
+        return negotiateTimeout;
+    }
 
-	public long getNegotiateTimeout() {
-		return negotiateTimeout;
-	}
-
-	public void setNegotiateTimeout(long negotiateTimeout) {
-		this.negotiateTimeout = negotiateTimeout;
-	}
+    public void setNegotiateTimeout(long negotiateTimeout) {
+        this.negotiateTimeout = negotiateTimeout;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java Wed Aug  8 11:56:59 2007
@@ -26,7 +26,7 @@
 
 public abstract class DiscoveryAgentFactory {
 
-    static final private FactoryFinder discoveryAgentFinder = new FactoryFinder("META-INF/services/org/apache/activemq/transport/discoveryagent/");    
+    static final private FactoryFinder discoveryAgentFinder = new FactoryFinder("META-INF/services/org/apache/activemq/transport/discoveryagent/");
     static final private ConcurrentHashMap discoveryAgentFactorys = new ConcurrentHashMap();
 
     /**
@@ -36,22 +36,21 @@
      */
     private static DiscoveryAgentFactory findDiscoveryAgentFactory(URI uri) throws IOException {
         String scheme = uri.getScheme();
-        if( scheme == null )
+        if (scheme == null)
             throw new IOException("DiscoveryAgent scheme not specified: [" + uri + "]");
-        DiscoveryAgentFactory daf = (DiscoveryAgentFactory) discoveryAgentFactorys.get(scheme);
+        DiscoveryAgentFactory daf = (DiscoveryAgentFactory)discoveryAgentFactorys.get(scheme);
         if (daf == null) {
             // Try to load if from a META-INF property.
             try {
-                daf = (DiscoveryAgentFactory) discoveryAgentFinder.newInstance(scheme);
+                daf = (DiscoveryAgentFactory)discoveryAgentFinder.newInstance(scheme);
                 discoveryAgentFactorys.put(scheme, daf);
-            }
-            catch (Throwable e) {
+            } catch (Throwable e) {
                 throw IOExceptionSupport.create("DiscoveryAgent scheme NOT recognized: [" + scheme + "]", e);
             }
         }
         return daf;
     }
-    
+
     public static DiscoveryAgent createDiscoveryAgent(URI uri) throws IOException {
         DiscoveryAgentFactory tf = findDiscoveryAgentFactory(uri);
         return tf.doCreateDiscoveryAgent(uri);
@@ -59,19 +58,22 @@
     }
 
     abstract protected DiscoveryAgent doCreateDiscoveryAgent(URI uri) throws IOException;
-//    {
-//        try {
-//            String type = ( uri.getScheme() == null ) ? uri.getPath() : uri.getScheme();
-//            DiscoveryAgent rc = (DiscoveryAgent) discoveryAgentFinder.newInstance(type);
-//            Map options = URISupport.parseParamters(uri);
-//            IntrospectionSupport.setProperties(rc, options);
-//            if( rc.getClass() == SimpleDiscoveryAgent.class ) {
-//                CompositeData data = URISupport.parseComposite(uri);
-//                ((SimpleDiscoveryAgent)rc).setServices(data.getComponents());
-//            }
-//            return rc;
-//        } catch (Throwable e) {
-//            throw IOExceptionSupport.create("Could not create discovery agent: "+uri, e);
-//        }
-//    }   
+    // {
+    // try {
+    // String type = ( uri.getScheme() == null ) ? uri.getPath() :
+    // uri.getScheme();
+    // DiscoveryAgent rc = (DiscoveryAgent)
+    // discoveryAgentFinder.newInstance(type);
+    // Map options = URISupport.parseParamters(uri);
+    // IntrospectionSupport.setProperties(rc, options);
+    // if( rc.getClass() == SimpleDiscoveryAgent.class ) {
+    // CompositeData data = URISupport.parseComposite(uri);
+    // ((SimpleDiscoveryAgent)rc).setServices(data.getComponents());
+    // }
+    // return rc;
+    // } catch (Throwable e) {
+    // throw IOExceptionSupport.create("Could not create discovery agent: "+uri,
+    // e);
+    // }
+    // }
 }



Mime
View raw message