activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r813927 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
Date Fri, 11 Sep 2009 17:10:49 GMT
Author: rajdavies
Date: Fri Sep 11 17:10:49 2009
New Revision: 813927

URL: http://svn.apache.org/viewvc?rev=813927&view=rev
Log:
patch applied for https://issues.apache.org/activemq/browse/AMQ-2191

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=813927&r1=813926&r2=813927&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
Fri Sep 11 17:10:49 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq;
 
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -27,11 +28,13 @@
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
+import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.DataArrayResponse;
 import org.apache.activemq.command.DataStructure;
 import org.apache.activemq.command.IntegerResponse;
 import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.Response;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.XATransactionId;
@@ -280,7 +283,7 @@
             TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId,
TransactionInfo.COMMIT_ONE_PHASE);
             this.transactionId = null;
             // Notify the listener that the tx was committed back
-            this.connection.syncSendPacket(info);
+            syncSendPacketWithInterruptionHandling(info);
             if (localTransactionEventListener != null) {
                 localTransactionEventListener.commitEvent();
             }
@@ -399,7 +402,7 @@
             TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
 
             // Find out if the server wants to commit or rollback.
-            IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info);
+            IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
             return response.getResult();
 
         } catch (JMSException e) {
@@ -433,7 +436,7 @@
 
             // Let the server know that the tx is rollback.
             TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
-            this.connection.syncSendPacket(info);
+            syncSendPacketWithInterruptionHandling(info);
 
             List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
             if (l != null && !l.isEmpty()) {
@@ -472,7 +475,7 @@
             // Notify the server that the tx was committed back
             TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE
: TransactionInfo.COMMIT_TWO_PHASE);
 
-            this.connection.syncSendPacket(info);
+            syncSendPacketWithInterruptionHandling(info);
 
             List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
             if (l != null && !l.isEmpty()) {
@@ -509,7 +512,7 @@
 
         try {
             // Tell the server to forget the transaction.
-            this.connection.syncSendPacket(info);
+            syncSendPacketWithInterruptionHandling(info);
         } catch (JMSException e) {
             throw toXAException(e);
         }
@@ -601,7 +604,7 @@
             if (transactionId != null) {
                 TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.END);
                 try {
-                    this.connection.syncSendPacket(info);
+                    syncSendPacketWithInterruptionHandling(info);
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Ended XA transaction: " + transactionId);
                     }
@@ -628,6 +631,31 @@
     }
 
     /**
+     * Sends the given command. Also sends the command in case of interruption,
+     * so that important commands like rollback and commit are never interrupted.
+     * If interruption occurred, set the interruption state of the current 
+     * after performing the action again. 
+     * 
+     * @return the response
+     */
+    private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException
{
+    	try {
+			return this.connection.syncSendPacket(command);
+		} catch (JMSException e) {
+			if (e.getLinkedException() instanceof InterruptedIOException) {
+				try {
+					Thread.interrupted();
+					return this.connection.syncSendPacket(command);
+				} finally {
+					Thread.currentThread().interrupt();
+				}				
+			}
+			
+			throw e;
+		}
+    }
+
+    /**
      * Converts a JMSException from the server to an XAException. if the
      * JMSException contained a linked XAException that is returned instead.
      * 



Mime
View raw message