qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1562444 - in /qpid/trunk/qpid/java: amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/ amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/
Date Wed, 29 Jan 2014 13:53:54 GMT
Author: rgodfrey
Date: Wed Jan 29 13:53:53 2014
New Revision: 1562444

URL: http://svn.apache.org/r1562444
Log:
QPID-5522 : TransactionController endless wait when the TCP/IP connection is lost
detect link / session / connection failures when sending to, and expecting a response from,
the transaction controller

Modified:
    qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
    qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java
    qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
    qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java
    qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1562444&r1=1562443&r2=1562444&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
(original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
Wed Jan 29 13:53:53 2014
@@ -38,15 +38,8 @@ import javax.jms.Queue;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
-import org.apache.qpid.amqp_1_0.client.Connection;
-import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
-import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
-import org.apache.qpid.amqp_1_0.client.ConnectionException;
-import org.apache.qpid.amqp_1_0.client.ChannelsExhaustedException;
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.client.Receiver;
-import org.apache.qpid.amqp_1_0.client.Sender;
-import org.apache.qpid.amqp_1_0.client.Transaction;
+
+import org.apache.qpid.amqp_1_0.client.*;
 import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
 import org.apache.qpid.amqp_1_0.jms.QueueSender;
 import org.apache.qpid.amqp_1_0.jms.QueueSession;
@@ -150,7 +143,17 @@ public class SessionImpl implements Sess
         });
         if(_acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED)
         {
-            _txn = _session.createSessionLocalTransaction();
+            try
+            {
+                _txn = _session.createSessionLocalTransaction();
+            }
+            catch (LinkDetachedException e)
+            {
+                JMSException jmsException = new JMSException("Unable to create transactional
session");
+                jmsException.setLinkedException(e);
+                jmsException.initCause(e);
+                throw jmsException;
+            }
         }
 
         _messageFactory = new MessageFactory(this);
@@ -236,14 +239,23 @@ public class SessionImpl implements Sess
         checkClosed();
         checkTransactional();
 
-        _txn.commit();
-        for(MessageConsumerImpl consumer : _consumers)
+        try
         {
-            consumer.postCommit();
-        }
+            _txn.commit();
+            for(MessageConsumerImpl consumer : _consumers)
+            {
+                consumer.postCommit();
+            }
 
-        _txn = _session.createSessionLocalTransaction();
-        //TODO
+            _txn = _session.createSessionLocalTransaction();
+        }
+        catch (LinkDetachedException e)
+        {
+            final JMSException jmsException = new JMSException("Unable to commit transaction");
+            jmsException.setLinkedException(e);
+            jmsException.initCause(e);
+            throw jmsException;
+        }
     }
 
     public void rollback() throws JMSException
@@ -251,16 +263,24 @@ public class SessionImpl implements Sess
         checkClosed();
         checkTransactional();
 
-        _txn.rollback();
-
-        for(MessageConsumerImpl consumer : _consumers)
+        try
         {
-            consumer.postRollback();
-        }
+            _txn.rollback();
 
-        _txn = _session.createSessionLocalTransaction();
+            for(MessageConsumerImpl consumer : _consumers)
+            {
+                consumer.postRollback();
+            }
 
-        //TODO
+            _txn = _session.createSessionLocalTransaction();
+        }
+        catch (LinkDetachedException e)
+        {
+            final JMSException jmsException = new JMSException("Unable to rollback transaction");
+            jmsException.setLinkedException(e);
+            jmsException.initCause(e);
+            throw jmsException;
+        }
     }
 
     private void checkTransactional() throws JMSException

Modified: qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java?rev=1562444&r1=1562443&r2=1562444&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java
(original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java
Wed Jan 29 13:53:53 2014
@@ -230,6 +230,10 @@ public class Receive extends Util
         {
             e.printStackTrace();  //TODO.
         }
+        catch (LinkDetachedException e)
+        {
+            e.printStackTrace();
+        }
 
     }
 

Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java?rev=1562444&r1=1562443&r2=1562444&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
(original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
Wed Jan 29 13:53:53 2014
@@ -345,7 +345,7 @@ public class Session
     }
 
 
-    public Transaction createSessionLocalTransaction()
+    public Transaction createSessionLocalTransaction() throws LinkDetachedException
     {
         TransactionController localController = getSessionLocalTransactionController();
         return localController.beginTransaction();

Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java?rev=1562444&r1=1562443&r2=1562444&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java
(original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java
Wed Jan 29 13:53:53 2014
@@ -32,12 +32,12 @@ public class Transaction
         _txnId = txnId;
     }
 
-    public void commit()
+    public void commit() throws LinkDetachedException
     {
         _transactionController.commit(this);
     }
 
-    public void rollback()
+    public void rollback() throws LinkDetachedException
     {
         _transactionController.rollback(this);
     }

Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java?rev=1562444&r1=1562443&r2=1562444&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java
(original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java
Wed Jan 29 13:53:53 2014
@@ -21,14 +21,17 @@ package org.apache.qpid.amqp_1_0.client;
 
 import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
 import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
 import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.DeliveryState;
 import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
 import org.apache.qpid.amqp_1_0.type.transaction.Declare;
 import org.apache.qpid.amqp_1_0.type.transaction.Declared;
 import org.apache.qpid.amqp_1_0.type.transaction.Discharge;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
 
 
 public class TransactionController implements DeliveryStateHandler
@@ -38,15 +41,30 @@ public class TransactionController imple
     private Session _session;
     private volatile DeliveryState _state;
     private boolean _received;
+    private Error _error;
 
     public TransactionController(Session session, SendingLinkEndpoint tcLinkEndpoint)
     {
         _session = session;
         _endpoint = tcLinkEndpoint;
         _endpoint.setDeliveryStateHandler(this);
+        _endpoint.setLinkEventListener(new SendingLinkListener()
+        {
+            @Override
+            public void flowStateChanged()
+            {
+                // ignore
+            }
+
+            @Override
+            public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+            {
+                TransactionController.this.remoteDetached(detach);
+            }
+        });
     }
 
-    public Transaction beginTransaction()
+    public Transaction beginTransaction() throws LinkDetachedException
     {
 
 
@@ -54,7 +72,7 @@ public class TransactionController imple
         return new Transaction(this, txnId);
     }
 
-    private Binary declare()
+    private Binary declare() throws LinkDetachedException
     {
         SectionEncoder encoder = _session.getSectionEncoder();
 
@@ -87,9 +105,17 @@ public class TransactionController imple
             //TODO - rationalise sending of flows
             // _endpoint.sendFlow();
         }
+        waitForResponse();
+
+
+        return ((Declared) _state).getTxnId();
+    }
+
+    private void waitForResponse() throws LinkDetachedException
+    {
         synchronized (this)
         {
-            while(!_received)
+            while(!_received && !_endpoint.isDetached())
             {
                 try
                 {
@@ -101,23 +127,33 @@ public class TransactionController imple
                 }
             }
         }
+        if(!_received && _endpoint.isDetached())
+        {
+            throw new LinkDetachedException(_error);
+        }
+    }
 
-
-        return ((Declared) _state).getTxnId();
+    private synchronized void remoteDetached(Detach detach)
+    {
+        if(detach != null && detach.getError() != null)
+        {
+            _error = detach.getError();
+            notifyAll();
+        }
     }
 
 
-    public void commit(final Transaction transaction)
+    public void commit(final Transaction transaction) throws LinkDetachedException
     {
         discharge(transaction.getTxnId(), false);
     }
 
-    public void rollback(final Transaction transaction)
+    public void rollback(final Transaction transaction) throws LinkDetachedException
     {
         discharge(transaction.getTxnId(), true);
     }
 
-    private void discharge(final Binary txnId, final boolean fail)
+    private void discharge(final Binary txnId, final boolean fail) throws LinkDetachedException
     {
         Discharge discharge = new Discharge();
         discharge.setTxnId(txnId);
@@ -135,7 +171,7 @@ public class TransactionController imple
         final Object lock = _endpoint.getLock();
         synchronized(lock)
         {
-            while(!_endpoint.hasCreditToSend())
+            while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
             {
                 try
                 {
@@ -146,6 +182,10 @@ public class TransactionController imple
 
                 }
             }
+            if(_endpoint.isDetached())
+            {
+                throw new LinkDetachedException(_error);
+            }
             _state = null;
             _received = false;
             _endpoint.transfer(transfer);
@@ -153,20 +193,7 @@ public class TransactionController imple
             //TODO - rationalise sending of flows
             // _endpoint.sendFlow();
         }
-        synchronized (this)
-        {
-            while(!_received)
-            {
-                try
-                {
-                    wait();
-                }
-                catch (InterruptedException e)
-                {
-
-                }
-            }
-        }
+        waitForResponse();
 
 
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message