activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r663059 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
Date Wed, 04 Jun 2008 10:52:13 GMT
Author: rajdavies
Date: Wed Jun  4 03:52:13 2008
New Revision: 663059

URL: http://svn.apache.org/viewvc?rev=663059&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1760

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ClientInternalExceptionListener.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=663059&r1=663058&r2=663059&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Wed Jun  4 03:52:13 2008
@@ -118,6 +118,7 @@
     // Connection state variables
     private final ConnectionInfo info;
     private ExceptionListener exceptionListener;
+    private ClientInternalExceptionListener clientInternalExceptionListener;
     private boolean clientIDSet;
     private boolean isConnectionInfoSentToBroker;
     private boolean userSpecifiedClientID;
@@ -404,7 +405,7 @@
      * associated with it.
      * 
      * @return the <CODE>ExceptionListener</CODE> for this connection, or
-     *         null. if no <CODE>ExceptionListener</CODE> is associated with
+     *         null, if no <CODE>ExceptionListener</CODE> is associated with
      *         this connection.
      * @throws JMSException if the JMS provider fails to get the
      *                 <CODE>ExceptionListener</CODE> for this connection.
@@ -444,6 +445,32 @@
     }
 
     /**
+     * Gets the <code>ClientInternalExceptionListener</code> object for this
connection.
+     * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
+     * associated with it.
+     * 
+     * @return the listener or <code>null</code> if no listener is registered
with the connection.
+     */
+    public ClientInternalExceptionListener getClientInternalExceptionListener()
+    {
+        return clientInternalExceptionListener;
+    }
+
+    /**
+     * Sets a client internal exception listener for this connection.
+     * The connection will notify the listener, if one has been registered, of exceptions
thrown by container components
+     * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing
of a message.
+     * It does this by calling the listener's <code>onException()</code> method
passing it a <code>Throwable</code>
+     * describing the problem.
+     * 
+     * @param listener the exception listener
+     */
+    public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
+    {
+        this.clientInternalExceptionListener = listener;
+    }
+    
+    /**
      * Starts (or restarts) a connection's delivery of incoming messages. A call
      * to <CODE>start</CODE> on a connection that has already been started is
      * ignored.
@@ -1672,7 +1699,7 @@
                     }
                 });
             } catch (Exception e) {
-                onAsyncException(e);
+                onClientInternalException(e);
             }
 
         }
@@ -1687,6 +1714,30 @@
     }
 
     /**
+     * Handles async client internal exceptions.
+     * A client internal exception is usually one that has been thrown
+     * by a container runtie component during asynchronous processing of a
+     * message that does not affect the connection itself.
+     * This method notifies the <code>ClientInternalExceptionListener</code>
by invoking
+     * its <code>onException</code> method, if one has been registered with this
connection.
+     * 
+     * @param error the exception that the problem
+     */
+    public void onClientInternalException(final Throwable error) {
+        if ( !closed.get() && !closing.get() ) {
+            if ( this.clientInternalExceptionListener != null ) {
+                asyncConnectionThread.execute(new Runnable() {
+                    public void run() {
+                        ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
+                    }
+                });
+            } else {
+                LOG.debug("Async client internal exception occurred with no exception listener
registered: " 
+                        + error, error);
+            }
+        }
+    }
+    /**
      * Used for handling async exceptions
      * 
      * @param error

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java?rev=663059&r1=663058&r2=663059&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java
Wed Jun  4 03:52:13 2008
@@ -139,7 +139,7 @@
                 ActiveMQQueueSession queueSession = (ActiveMQQueueSession)s;
                 session = (ActiveMQSession)queueSession.getNext();
             } else {
-                connection.onAsyncException(new JMSException("Session pool provided an invalid
session type: " + s.getClass()));
+                connection.onClientInternalException(new JMSException("Session pool provided
an invalid session type: " + s.getClass()));
                 return;
             }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=663059&r1=663058&r2=663059&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Wed Jun  4 03:52:13 2008
@@ -1013,7 +1013,7 @@
                 Thread.yield();
             }
         } catch (Exception e) {
-            session.connection.onAsyncException(e);
+            session.connection.onClientInternalException(e);
         }
     }
 
@@ -1057,7 +1057,7 @@
                     listener.onMessage(message);
                     afterMessageIsConsumed(md, false);
                 } catch (JMSException e) {
-                    session.connection.onAsyncException(e);
+                    session.connection.onClientInternalException(e);
                 }
                 return true;
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java?rev=663059&r1=663058&r2=663059&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java
Wed Jun  4 03:52:13 2008
@@ -186,7 +186,7 @@
                     return answer;
                 }
             } catch (JMSException e) {
-                this.session.connection.onAsyncException(e);
+                this.session.connection.onClientInternalException(e);
                 return null;
             }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=663059&r1=663058&r2=663059&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Wed
Jun  4 03:52:13 2008
@@ -733,10 +733,15 @@
 
             try {
                 messageListener.onMessage(message);
-            } catch (Throwable e) {
-                // TODO: figure out proper way to handle error.
+            } catch (RuntimeException e) {
                 LOG.error("error dispatching message: ", e);
-                connection.onAsyncException(e);
+                // A problem while invoking the MessageListener does not
+                // in general indicate a problem with the connection to the broker, i.e.
+                // it will usually be sufficient to let the afterDelivery() method either
+                // commit or roll back in order to deal with the exception.
+                // However, we notify any registered client internal exception listener
+                // of the problem.
+                connection.onClientInternalException(e);
             }
 
             try {
@@ -786,7 +791,7 @@
                 }
                 asyncSendPacket(ack);
             } catch (Throwable e) {
-                connection.onAsyncException(e);
+                connection.onClientInternalException(e);
             }
 
             if (deliveryListener != null) {
@@ -1431,7 +1436,7 @@
             executor.execute(messageDispatch);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-            connection.onAsyncException(e);
+            connection.onClientInternalException(e);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java?rev=663059&r1=663058&r2=663059&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java Wed
Jun  4 03:52:13 2008
@@ -72,7 +72,7 @@
                 connection.asyncSendPacket(ack);
                 deliveredCounter = 0;
             } catch (JMSException e) {
-                connection.onAsyncException(e);
+                connection.onClientInternalException(e);
             }
         }
 

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ClientInternalExceptionListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ClientInternalExceptionListener.java?rev=663059&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ClientInternalExceptionListener.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ClientInternalExceptionListener.java
Wed Jun  4 03:52:13 2008
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+/**
+ * An exception listener similar to the standard <code>javax.jms.ExceptionListener</code>
+ * which can be used by client code to be notified of exceptions thrown by container components

+ * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing
of a message.
+ * <p>
+ * The <code>org.apache.activemq.ActiveMQConnection</code> that the listener
has been registered with does
+ * this by calling the listener's <code>onException()</code> method passing it
a <code>Throwable</code> describing
+ * the problem.
+ * </p>
+ * 
+ * @author Kai Hudalla
+ * @see ActiveMQConnection#setClientInternalExceptionListener(org.apache.activemq.ClientInternalExceptionListener)
+ */
+public interface ClientInternalExceptionListener
+{
+    /**
+     * Notifies a client of an exception while asynchronously processing a message.
+     * 
+     * @param exception the exception describing the problem
+     */
+    void onException(Throwable exception);
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ClientInternalExceptionListener.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message