activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r386608 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Date Fri, 17 Mar 2006 11:14:14 GMT
Author: jstrachan
Date: Fri Mar 17 03:14:11 2006
New Revision: 386608

URL: http://svn.apache.org/viewcvs?rev=386608&view=rev
Log:
patch for AMQ-600 to catch IOException caused by attempts to dispatch synchronously to a connection
on a dead socket and treat them as a transport exception (rather than service exception),
disposing the connection so that clientID's can be reused

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=386608&r1=386607&r2=386608&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Fri Mar 17 03:14:11 2006
@@ -72,6 +72,7 @@
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.JMSExceptionSupport;
@@ -85,7 +86,7 @@
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 
-public class ActiveMQConnection extends DefaultTransportListener implements Connection, TopicConnection,
QueueConnection, StatsCapable, Closeable,  StreamConnection {
+public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection,
StatsCapable, Closeable,  StreamConnection, TransportListener {
 
     public static final TaskRunnerFactory SESSION_TASK_RUNNER = new TaskRunnerFactory("session
Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
 
@@ -130,6 +131,7 @@
     private final CopyOnWriteArrayList connectionConsumers = new CopyOnWriteArrayList();
     private final CopyOnWriteArrayList inputStreams = new CopyOnWriteArrayList();
     private final CopyOnWriteArrayList outputStreams = new CopyOnWriteArrayList();
+    private final CopyOnWriteArrayList transportListeners = new CopyOnWriteArrayList();
 
     // Maps ConsumerIds to ActiveMQConsumer objects
     private final ConcurrentHashMap dispatchers = new ConcurrentHashMap();
@@ -147,7 +149,6 @@
     private IOException firstFailureError;
 
 
-
     /**
      * Construct an <code>ActiveMQConnection</code>
      * @param transport 
@@ -790,6 +791,17 @@
         this.useRetroactiveConsumer = useRetroactiveConsumer;
     }
 
+    /**
+     * Adds a transport listener so that a client can be notified of events in the underlying

+     * transport
+     */
+    public void addTransportListener(TransportListener transportListener) {
+        transportListeners.add(transportListener);
+    }
+    
+    public void removeTransportListener(TransportListener transportListener) {
+        transportListeners.remove(transportListener);
+    }
     
     // Implementation methods
     // -------------------------------------------------------------------------
@@ -1175,7 +1187,7 @@
      */
     protected void ensureConnectionInfoSent() throws JMSException {
         // Can we skip sending the ConnectionInfo packet??
-        if (isConnectionInfoSentToBroker) {
+        if (isConnectionInfoSentToBroker || closed.get()) {
             return;
         }
 
@@ -1241,7 +1253,7 @@
         }
 
         if(isConnectionInfoSentToBroker){
-            if(!transportFailed.get()){
+            if(!transportFailed.get() && !closing.get()){
                 asyncSendPacket(info.createRemoveCommand());
             }
             isConnectionInfoSentToBroker=false;
@@ -1368,6 +1380,10 @@
                 onAsyncException(((ConnectionError)command).getException());
             }
         }
+        for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
+            TransportListener listener = (TransportListener) iter.next();
+            listener.onCommand(command);
+        }
     }
 
     /**
@@ -1386,14 +1402,33 @@
             }
         }
     }
-
     
     public void onException(IOException error) {
         onAsyncException(error);
         transportFailed(error);
         ServiceSupport.dispose(this.transport);
         brokerInfoReceived.countDown();
+
+        for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
+            TransportListener listener = (TransportListener) iter.next();
+            listener.onException(error);
+        }
+    }
+    
+    public void transportInterupted() {
+        for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
+            TransportListener listener = (TransportListener) iter.next();
+            listener.transportInterupted();
+        }
     }
+
+    public void transportResumed() {
+        for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
+            TransportListener listener = (TransportListener) iter.next();
+            listener.transportResumed();
+        }
+    }
+
 
     /**
      * Create the DestinationInfo object for the temporary destination.



Mime
View raw message