activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r432221 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport: TransportDisposedIOException.java vm/VMTransport.java
Date Thu, 17 Aug 2006 13:01:51 GMT
Author: rajdavies
Date: Thu Aug 17 06:01:50 2006
New Revision: 432221

URL: http://svn.apache.org/viewvc?rev=432221&view=rev
Log:
Added optinal async delivery to VMTransport

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java?rev=432221&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
Thu Aug 17 06:01:50 2006
@@ -0,0 +1,43 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+
+/**
+ * This is exception is thrown when the transport is disposed
+ * 
+ * @version $Revision$
+ */
+public class TransportDisposedIOException extends IOException {
+
+    private static final long serialVersionUID=-7107323414439622596L;
+
+    public TransportDisposedIOException() {
+        super();
+    }
+
+    /**
+     * @param message
+     */
+    public TransportDisposedIOException(String message) {
+        super(message);
+    }
+
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=432221&r1=432220&r2=432221&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Thu Aug 17 06:01:50 2006
@@ -1,19 +1,15 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.activemq.transport.vm;
 
@@ -23,37 +19,46 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.Response;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
 /**
  * A Transport implementation that uses direct method invocations.
  * 
  * @version $Revision$
  */
-public class VMTransport implements Transport{
+public class VMTransport implements Transport,Task{
     private static final Log log=LogFactory.getLog(VMTransport.class);
-    private static final AtomicLong nextId = new AtomicLong(0);
-    
+    private static final AtomicLong nextId=new AtomicLong(0);
+    private static final TaskRunnerFactory taskRunnerFactory=new TaskRunnerFactory("VMTransport",Thread.NORM_PRIORITY,
+                    true,1000);
     protected VMTransport peer;
     protected TransportListener transportListener;
     protected boolean disposed;
     protected boolean marshal;
     protected boolean network;
-    protected List queue = Collections.synchronizedList(new LinkedList());
+    protected boolean async=false;
+    protected boolean started=false;
+    protected int asyncQueueDepth=2000;
+    protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList());
+    protected LinkedBlockingQueue messageQueue;
     protected final URI location;
     protected final long id;
-    
-    public VMTransport(URI location) {
-        this.location = location;
+    private TaskRunner taskRunner;
+
+    public VMTransport(URI location){
+        this.location=location;
         this.id=nextId.getAndIncrement();
     }
 
@@ -66,57 +71,75 @@
     }
 
     public void oneway(Command command) throws IOException{
-        if(disposed)
-            throw new IOException("Transport disposed.");
+        if(disposed){
+            throw new TransportDisposedIOException("Transport disposed.");
+        }
         if(peer==null)
             throw new IOException("Peer not connected.");
-        if (!peer.disposed){
-            TransportListener tl = peer.transportListener;
-            queue = peer.queue;
-            if (tl != null){
+        if(!peer.disposed){
+            final TransportListener tl=peer.transportListener;
+            messageQueue=getMessageQueue();
+            prePeerSetQueue=peer.prePeerSetQueue;
+            if(tl==null){
+                prePeerSetQueue.add(command);
+            }else if(!async){
                 tl.onCommand(command);
-            }else {
-                queue.add(command);
+            }else{
+                try{
+                    messageQueue.put(command);
+                    wakeup();
+                }catch(final InterruptedException e){
+                    log.error("messageQueue interuppted",e);
+                    throw new IOException(e.getMessage());
+                }
             }
-        } else {
-            throw new IOException("Peer disconnected.");
+        }else{
+            throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed.");
         }
     }
 
-    public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback)
throws IOException{
+    public FutureResponse asyncRequest(Command command,ResponseCallback responseCallback)
throws IOException{
         throw new AssertionError("Unsupported Method");
     }
 
     public Response request(Command command) throws IOException{
         throw new AssertionError("Unsupported Method");
     }
-    
-    public Response request(Command command,int timeout) throws IOException {
+
+    public Response request(Command command,int timeout) throws IOException{
         throw new AssertionError("Unsupported Method");
     }
 
-    public synchronized TransportListener getTransportListener() {
+    public synchronized TransportListener getTransportListener(){
         return transportListener;
     }
 
     synchronized public void setTransportListener(TransportListener commandListener){
         this.transportListener=commandListener;
+        wakeup();
     }
 
     public synchronized void start() throws Exception{
+        started=true;
         if(transportListener==null)
             throw new IOException("TransportListener not set.");
-        for (Iterator iter = queue.iterator(); iter.hasNext();) {
-            Command command = (Command) iter.next();
+        for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
+            Command command=(Command) iter.next();
             transportListener.onCommand(command);
             iter.remove();
         }
+        wakeup();
     }
 
     public void stop() throws Exception{
+        started=false;
         if(!disposed){
             disposed=true;
         }
+        if(taskRunner!=null){
+            taskRunner.shutdown();
+            taskRunner=null;
+        }
     }
 
     public Object narrow(Class target){
@@ -141,16 +164,75 @@
     public void setNetwork(boolean network){
         this.network=network;
     }
-    
-    public String toString() {
+
+    public String toString(){
         return location+"#"+id;
     }
 
-	public String getRemoteAddress() {
-		if(peer != null){
-			return peer.toString();
-		}
-		return null;
-	}
+    public String getRemoteAddress(){
+        if(peer!=null){
+            return peer.toString();
+        }
+        return null;
+    }
 
+    // task implementation
+    public boolean iterate(){
+        TransportListener tl=peer.transportListener;
+        if(!messageQueue.isEmpty()&&!peer.disposed&&tl!=null){
+            Command command=(Command) messageQueue.poll();
+            if(tl!=null){
+                tl.onCommand(command);
+            }
+        }
+        return !messageQueue.isEmpty()&&!peer.disposed&&!(peer.transportListener==null);
+    }
+
+    /**
+     * @return the async
+     */
+    public boolean isAsync(){
+        return async;
+    }
+
+    /**
+     * @param async the async to set
+     */
+    public void setAsync(boolean async){
+        this.async=async;
+    }
+
+    /**
+     * @return the asyncQueueDepth
+     */
+    public int getAsyncQueueDepth(){
+        return asyncQueueDepth;
+    }
+
+    /**
+     * @param asyncQueueDepth the asyncQueueDepth to set
+     */
+    public void setAsyncQueueDepth(int asyncQueueDepth){
+        this.asyncQueueDepth=asyncQueueDepth;
+    }
+
+    protected void wakeup(){
+        if(async&&messageQueue!=null&&!messageQueue.isEmpty()){
+            if(taskRunner==null){
+                taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString());
+            }
+            try{
+                taskRunner.wakeup();
+            }catch(InterruptedException e){
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    protected synchronized LinkedBlockingQueue getMessageQueue(){
+        if(messageQueue==null){
+            messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
+        }
+        return messageQueue;
+    }
 }



Mime
View raw message