activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r432664 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Date Fri, 18 Aug 2006 18:02:53 GMT
Author: rajdavies
Date: Fri Aug 18 11:02:49 2006
New Revision: 432664

URL: http://svn.apache.org/viewvc?rev=432664&view=rev
Log:
tidy code for easier maintenance

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=432664&r1=432663&r2=432664&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Fri Aug 18 11:02:49 2006
@@ -77,26 +77,37 @@
         if(peer==null)
             throw new IOException("Peer not connected.");
         if(!peer.disposed){
-            final TransportListener tl=peer.transportListener;
-            messageQueue=getMessageQueue();
-            prePeerSetQueue=peer.prePeerSetQueue;
-            if(tl==null){
-                prePeerSetQueue.add(command);
-            }else if(!async){
-                tl.onCommand(command);
+           
+            if(async){
+               asyncOneWay(command); 
             }else{
-                try{
-                    messageQueue.put(command);
-                    wakeup();
-                }catch(final InterruptedException e){
-                    log.error("messageQueue interuppted",e);
-                    throw new IOException(e.getMessage());
-                }
+                syncOneWay(command);
             }
         }else{
             throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed.");
         }
     }
+    
+    protected void syncOneWay(Command command){
+        final TransportListener tl=peer.transportListener;
+        prePeerSetQueue=peer.prePeerSetQueue;
+        if(tl==null){
+            prePeerSetQueue.add(command);
+        }else{
+            tl.onCommand(command);
+        }
+    }
+    
+    protected void asyncOneWay(Command command) throws IOException{
+        messageQueue=getMessageQueue();
+        try{
+            messageQueue.put(command);
+            wakeup();
+        }catch(final InterruptedException e){
+            log.error("messageQueue interupted",e);
+            throw new IOException(e.getMessage());
+        }
+    }
 
     public FutureResponse asyncRequest(Command command,ResponseCallback responseCallback)
throws IOException{
         throw new AssertionError("Unsupported Method");
@@ -117,18 +128,23 @@
     synchronized public void setTransportListener(TransportListener commandListener){
         this.transportListener=commandListener;
         wakeup();
+        peer.wakeup();
     }
 
     public synchronized void start() throws Exception{
         started=true;
         if(transportListener==null)
             throw new IOException("TransportListener not set.");
-        for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
-            Command command=(Command) iter.next();
-            transportListener.onCommand(command);
-            iter.remove();
+        if(!async){
+            for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
+                Command command=(Command) iter.next();
+                transportListener.onCommand(command);
+                iter.remove();
+            }
+        }else{
+            wakeup();
+            peer.wakeup();
         }
-        wakeup();
     }
 
     public void stop() throws Exception{
@@ -176,14 +192,14 @@
         return null;
     }
 
-    // task implementation
+    /**
+     * @see org.apache.activemq.thread.Task#iterate()
+     */
     public boolean iterate(){
-        TransportListener tl=peer.transportListener;
+        final TransportListener tl=peer.transportListener;
         if(!messageQueue.isEmpty()&&!peer.disposed&&tl!=null){
-            Command command=(Command) messageQueue.poll();
-            if(tl!=null){
-                tl.onCommand(command);
-            }
+            final Command command=(Command) messageQueue.poll();
+            tl.onCommand(command);
         }
         return !messageQueue.isEmpty()&&!peer.disposed&&!(peer.transportListener==null);
     }



Mime
View raw message