activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r375721 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ActiveMQConnection.java ActiveMQXAConnection.java ConnectionFailedException.java
Date Tue, 07 Feb 2006 21:40:24 GMT
Author: rajdavies
Date: Tue Feb  7 13:40:23 2006
New Revision: 375721

URL: http://svn.apache.org/viewcvs?rev=375721&view=rev
Log:
receive() returns null on connection transport failure

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=375721&r1=375720&r2=375721&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Tue Feb  7 13:40:23 2006
@@ -126,6 +126,7 @@
     private final AtomicBoolean started = new AtomicBoolean(false);
     private final AtomicBoolean closing = new AtomicBoolean(false);
     private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final AtomicBoolean transportFailed = new AtomicBoolean(false);
     private final CopyOnWriteArrayList sessions = new CopyOnWriteArrayList();
     private final CopyOnWriteArrayList connectionConsumers = new CopyOnWriteArrayList();
     private final CopyOnWriteArrayList inputStreams = new CopyOnWriteArrayList();
@@ -246,7 +247,7 @@
      * @since 1.1
      */
     public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException
{
-        checkClosed();
+        checkClosedOrFailed();
         ensureConnectionInfoSent();
         return new ActiveMQSession(this, getNextSessionId(), (transacted ? Session.SESSION_TRANSACTED
                 : (acknowledgeMode == Session.SESSION_TRANSACTED ? Session.AUTO_ACKNOWLEDGE
: acknowledgeMode)), asyncDispatch);
@@ -273,7 +274,7 @@
      *             connection due to some internal error.
      */
     public String getClientID() throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         return this.info.getClientId();
     }
 
@@ -319,7 +320,7 @@
      *             configured.
      */
     public void setClientID(String newClientID) throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
 
         if (this.clientIDSet) {
             throw new IllegalStateException("The clientID has already been set");
@@ -344,7 +345,7 @@
      * @see javax.jms.ConnectionMetaData
      */
     public ConnectionMetaData getMetaData() throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         return ActiveMQConnectionMetaData.INSTANCE;
     }
 
@@ -362,7 +363,7 @@
      * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
      */
     public ExceptionListener getExceptionListener() throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         return this.exceptionListener;
     }
 
@@ -391,7 +392,7 @@
      *             this connection.
      */
     public void setExceptionListener(ExceptionListener listener) throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         this.exceptionListener = listener;
     }
 
@@ -406,7 +407,7 @@
      * @see javax.jms.Connection#stop()
      */
     public void start() throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         ensureConnectionInfoSent();
         if (started.compareAndSet(false, true)) {
             for (Iterator i = sessions.iterator(); i.hasNext();) {
@@ -456,7 +457,7 @@
      * @see javax.jms.Connection#start()
      */
     public void stop() throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         if (started.compareAndSet(true, false)) {
             for (Iterator i = sessions.iterator(); i.hasNext();) {
                 ActiveMQSession s = (ActiveMQSession) i.next();
@@ -647,7 +648,7 @@
     public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
             String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean
noLocal)
             throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         ensureConnectionInfoSent();
         SessionId sessionId = new SessionId(info.getConnectionId(), -1);
         ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator
@@ -945,7 +946,7 @@
 
     public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
ServerSessionPool sessionPool, int maxMessages, boolean noLocal) throws JMSException {
         
-        checkClosed();
+        checkClosedOrFailed();
         ensureConnectionInfoSent();
         ConsumerId consumerId = createConsumerId();
         ConsumerInfo info = new ConsumerInfo(consumerId);
@@ -1092,6 +1093,19 @@
 
     /**
      * simply throws an exception if the Connection is already closed
+     * or the Transport has failed
+     * 
+     * @throws JMSException
+     */
+    protected synchronized void checkClosedOrFailed() throws JMSException {
+        checkClosed();
+        if (transportFailed.get()){
+            throw new ConnectionFailedException();
+        }
+    }
+    
+    /**
+     * simply throws an exception if the Connection is already closed
      * 
      * @throws JMSException
      */
@@ -1315,9 +1329,11 @@
             } else {
                 log.warn("Async exception with no exception listener: " + error, error);
             }
+            transportFailed(error);
         }
     }
 
+    
     public void onException(IOException error) {
         onAsyncException(error);
         ServiceSupport.dispose(this.transport);
@@ -1359,7 +1375,7 @@
      */
     public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException
{
         
-        checkClosed();        
+        checkClosedOrFailed();        
         activeTempDestinations.remove(destination);
 
         DestinationInfo info = new DestinationInfo();
@@ -1394,7 +1410,7 @@
 
     public void destroyDestination(ActiveMQDestination destination) throws JMSException {
         
-        checkClosed();
+        checkClosedOrFailed();
         ensureConnectionInfoSent();
 
         DestinationInfo info = new DestinationInfo();
@@ -1447,7 +1463,7 @@
     }
     
     private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean
noLocal, String subName) throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         ensureConnectionInfoSent();
         return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest),
messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch());
     }
@@ -1458,7 +1474,7 @@
     }
 
     public OutputStream createOutputStream(Destination dest, Map streamProperties, int deliveryMode,
int priority, long timeToLive) throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         ensureConnectionInfoSent();
         return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest),
streamProperties, deliveryMode, priority, timeToLive);
     }
@@ -1484,7 +1500,7 @@
      * @since 1.1
      */
     public void unsubscribe(String name) throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
         rsi.setConnectionId(getConnectionInfo().getConnectionId());
         rsi.setSubcriptionName(name);
@@ -1500,7 +1516,7 @@
      *  - Does not allow you to send /w a transaction.
      */
     void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId,
int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
 
         if( destination.isTemporary() && isDeleted(destination) ) {
             throw new JMSException("Cannot publish to a deleted Destination: "+destination);
@@ -1561,6 +1577,16 @@
                 System.exit(0);
             }
         }
+    }
+    
+    protected void transportFailed(Throwable error){
+        transportFailed.set(true);
+        try{
+            cleanup();
+        }catch(JMSException e){
+           log.warn("Cleanup failed",e);
+        }
+        
     }
 
     public void setCopyMessageOnSend(boolean copyMessageOnSend) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java?rev=375721&r1=375720&r2=375721&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
Tue Feb  7 13:40:23 2006
@@ -73,7 +73,7 @@
     }
 
     public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException
{
-        checkClosed();
+        checkClosedOrFailed();
         ensureConnectionInfoSent();
         return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED,
asyncDispatch);
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java?rev=375721&r1=375720&r2=375721&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java
Tue Feb  7 13:40:23 2006
@@ -35,6 +35,10 @@
         initCause(cause);
         setLinkedException(cause);
     }
+    
+    public ConnectionFailedException() {
+        super("The JMS connection has failed due ti a Transport problem");
+    }
 
     static private String extractMessage(IOException cause) {
         String m = cause.getMessage();



Mime
View raw message