activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r593204 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Date Thu, 08 Nov 2007 15:39:52 GMT
Author: chirino
Date: Thu Nov  8 07:39:51 2007
New Revision: 593204

URL: http://svn.apache.org/viewvc?rev=593204&view=rev
Log:
The VMTransport now let's it's peer know when it's being stopped so that the Peer can give
its' transport listener a peer disconnected exception.  Otherwise a VM transport client could
disconnect without the server side knowing it disconnected and the server side would not terminate
it's side of the connection.  This could be seen as a memory leak on when the static network
config is setup and one of the static brokers is not up.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=593204&r1=593203&r2=593204&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Thu Nov  8 07:39:51 2007
@@ -43,6 +43,7 @@
  */
 public class VMTransport implements Transport, Task {
 
+    private static final Object DISCONNECT = new Object();
     private static final AtomicLong NEXT_ID = new AtomicLong(0);
     private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new TaskRunnerFactory("VMTransport",
Thread.NORM_PRIORITY, true, 1000);
     protected VMTransport peer;
@@ -91,7 +92,11 @@
                     peer.getMessageQueue().put(command);
                     peer.wakeup();
                 } else {
-                    peer.transportListener.onCommand(command);
+                    if( command == DISCONNECT ) {
+                        peer.transportListener.onException(new TransportDisposedIOException("Peer
(" + peer.toString() + ") disposed."));
+                    } else {
+                        peer.transportListener.onCommand(command);
+                    }
                 }
                 enqueueValve.decrement();
             } else {
@@ -137,6 +142,12 @@
         // If stop() is called while being start()ed.. then we can't stop until we return
to the start() method.
         if( enqueueValve.isOn() ) {
             
+            // let the peer know that we are disconnecting..
+            try {
+                oneway(DISCONNECT);
+            } catch (Exception ignore) {
+            }
+
             TaskRunner tr = null;
             try {
                 enqueueValve.turnOff();
@@ -183,9 +194,13 @@
         }
 
         LinkedBlockingQueue<Object> mq = getMessageQueue();
-        Command command = (Command)mq.poll();
+        Object command = mq.poll();
         if (command != null) {
-            tl.onCommand(command);
+            if( command == DISCONNECT ) {
+                tl.onException(new TransportDisposedIOException("Peer (" + peer.toString()
+ ") disposed."));
+            } else {
+                tl.onCommand(command);
+            }
             return !mq.isEmpty();
         } else {
             return false;



Mime
View raw message