activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r586185 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Date Fri, 19 Oct 2007 00:00:46 GMT
Author: chirino
Date: Thu Oct 18 17:00:45 2007
New Revision: 586185

URL: http://svn.apache.org/viewvc?rev=586185&view=rev
Log:
The VM transport could deadlock between the iterate() method and the oneway() method when
the async message buffer used by the transport fills up.  Change the synchronization logic
to make use the a Valve to avoid needing to lock mutexes for so long.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=586185&r1=586184&r2=586185&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Thu Oct 18 17:00:45 2007
@@ -17,14 +17,17 @@
 package org.apache.activemq.transport.vm;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.net.URI;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.command.Command;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.thread.Valve;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
@@ -54,23 +57,17 @@
     protected final URI location;
     protected final long id;
     private TaskRunner taskRunner;
-    private final Object mutex = new Object();
-
+    private final Object lazyInitMutext = new Object();
+    private final Valve enqueueValve = new Valve(true);
+    private final AtomicBoolean stopping = new AtomicBoolean();
+    
     public VMTransport(URI location) {
         this.location = location;
         this.id = NEXT_ID.getAndIncrement();
     }
 
-    public VMTransport getPeer() {
-        synchronized (mutex) {
-            return peer;
-        }
-    }
-
     public void setPeer(VMTransport peer) {
-        synchronized (mutex) {
-            this.peer = peer;
-        }
+        this.peer = peer;
     }
 
     public void oneway(Object command) throws IOException {
@@ -81,78 +78,42 @@
             throw new IOException("Peer not connected.");
         }
 
-        TransportListener tl = null;
-        synchronized (peer.mutex) {
-            if (peer.disposed) {
+        try {
+            // Disable the peer from changing his state while we try to enqueue onto him.
+            peer.enqueueValve.increment();
+        
+            if (peer.disposed || peer.stopping.get()) {
                 throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
             }
+            
             if (peer.started) {
                 if (peer.async) {
-                    peer.enqueue(command);
+                    peer.getMessageQueue().put(command);
                     peer.wakeup();
                 } else {
-                    tl = peer.transportListener;
+                    peer.transportListener.onCommand(command);
                 }
+                enqueueValve.decrement();
             } else {
-                peer.enqueue(command);
+                peer.getMessageQueue().put(command);
             }
-        }
-
-        if (tl != null) {
-            tl.onCommand(command);
-        }
-
-    }
-
-    private void enqueue(Object command) throws IOException {
-        try {
-            getMessageQueue().put(command);
-        } catch (final InterruptedException e) {
+            
+        } catch (InterruptedException e) {
             throw IOExceptionSupport.create(e);
+        } finally {
+            // Allow the peer to change state again...
+            peer.enqueueValve.decrement();
         }
-    }
 
-    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback)
throws IOException {
-        throw new AssertionError("Unsupported Method");
-    }
-
-    public Object request(Object command) throws IOException {
-        throw new AssertionError("Unsupported Method");
-    }
-
-    public Object request(Object command, int timeout) throws IOException {
-        throw new AssertionError("Unsupported Method");
-    }
-
-    public TransportListener getTransportListener() {
-        synchronized (mutex) {
-            return transportListener;
-        }
-    }
-
-    public void setTransportListener(TransportListener commandListener) {
-        synchronized (mutex) {
-            this.transportListener = commandListener;
-            wakeup();
-        }
-    }
-
-    private LinkedBlockingQueue<Object> getMessageQueue() {
-        synchronized (mutex) {
-            if (messageQueue == null) {
-                messageQueue = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
-            }
-            return messageQueue;
-        }
     }
 
     public void start() throws Exception {
         if (transportListener == null) {
             throw new IOException("TransportListener not set.");
         }
-
-        synchronized (mutex) {
-            if (messageQueue != null) {
+        try {
+            enqueueValve.turnOff();
+            if (messageQueue != null && !async) {
                 Object command;
                 while ((command = messageQueue.poll()) != null) {
                     transportListener.onCommand(command);
@@ -160,12 +121,16 @@
             }
             started = true;
             wakeup();
+        } finally {
+            enqueueValve.turnOn();
         }
     }
 
     public void stop() throws Exception {
         TaskRunner tr = null;
-        synchronized (mutex) {
+        try {
+            stopping.set(true);
+            enqueueValve.turnOff();
             if (!disposed) {
                 started = false;
                 disposed = true;
@@ -174,11 +139,88 @@
                     taskRunner = null;
                 }
             }
+        } finally {
+            stopping.set(false);
+            enqueueValve.turnOn();
         }
         if (tr != null) {
             tr.shutdown(1000);
         }
     }
+    
+    /**
+     * @see org.apache.activemq.thread.Task#iterate()
+     */
+    public boolean iterate() {
+        
+        final TransportListener tl;
+        try {
+            // Disable changing the state variables while we are running... 
+            enqueueValve.increment();
+            tl = transportListener;
+            if (!started || disposed || tl == null || stopping.get()) {
+                if( stopping.get() ) {
+                    // drain the queue it since folks could be blocked putting on to
+                    // it and that would not allow the stop() method for finishing up.
+                    getMessageQueue().clear();  
+                }
+                return false;
+            }
+        } catch (InterruptedException e) {
+            return false;
+        } finally {
+            enqueueValve.decrement();
+        }
+
+        LinkedBlockingQueue<Object> mq = getMessageQueue();
+        Command command = (Command)mq.poll();
+        if (command != null) {
+            tl.onCommand(command);
+            return !mq.isEmpty();
+        } else {
+            return false;
+        }
+        
+    }
+
+    public void setTransportListener(TransportListener commandListener) {
+        try {
+            try {
+                enqueueValve.turnOff();
+                this.transportListener = commandListener;
+                wakeup();
+            } finally {
+                enqueueValve.turnOn();
+            }
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private LinkedBlockingQueue<Object> getMessageQueue() {
+        synchronized (lazyInitMutext) {
+            if (messageQueue == null) {
+                messageQueue = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
+            }
+            return messageQueue;
+        }
+    }
+
+    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback)
throws IOException {
+        throw new AssertionError("Unsupported Method");
+    }
+
+    public Object request(Object command) throws IOException {
+        throw new AssertionError("Unsupported Method");
+    }
+
+    public Object request(Object command, int timeout) throws IOException {
+        throw new AssertionError("Unsupported Method");
+    }
+
+    public TransportListener getTransportListener() {
+        return transportListener;
+    }
 
     public <T> T narrow(Class<T> target) {
         if (target.isAssignableFrom(getClass())) {
@@ -215,28 +257,6 @@
     }
 
     /**
-     * @see org.apache.activemq.thread.Task#iterate()
-     */
-    public boolean iterate() {
-        final TransportListener tl;
-        synchronized (mutex) {
-            tl = transportListener;
-            if (!started || disposed || tl == null) {
-                return false;
-            }
-        }
-
-        LinkedBlockingQueue<Object> mq = getMessageQueue();
-        final Command command = (Command)mq.poll();
-        if (command != null) {
-            tl.onCommand(command);
-            return !mq.isEmpty();
-        } else {
-            return false;
-        }
-    }
-
-    /**
      * @return the async
      */
     public boolean isAsync() {
@@ -266,7 +286,7 @@
 
     protected void wakeup() {
         if (async) {
-            synchronized (mutex) {
+            synchronized (lazyInitMutext) {
                 if (taskRunner == null) {
                     taskRunner = TASK_RUNNER_FACTORY.createTaskRunner(this, "VMTransport:
" + toString());
                 }



Mime
View raw message