activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-2191 https://issues.apache.org/jira/browse/AMQ-3529 - rework fixes to remove uncertanty from dealing with intettuptedexception. Sync requests will trap interrupts that ocurr while waiting for
Date Fri, 27 Nov 2015 12:36:03 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 95f58fa7c -> 0a12bcb92


https://issues.apache.org/jira/browse/AMQ-2191 https://issues.apache.org/jira/browse/AMQ-3529
- rework fixes to remove uncertanty from dealing with intettuptedexception. Sync requests
will trap interrupts that ocurr while waiting for responses and fail the connection with an
interruptedioexception. Interrupts pending before requests will be suppressed, allowing possible
clean shutdown. It is not safe to replay openwire ops b/c they are not idempotent, the only
safe option is to have a teardown of the broker side state from a close


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0a12bcb9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0a12bcb9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0a12bcb9

Branch: refs/heads/master
Commit: 0a12bcb928f151b5ace9a0cffc34ec717b6a8e9c
Parents: 95f58fa
Author: gtully <gary.tully@gmail.com>
Authored: Fri Nov 27 12:20:12 2015 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Fri Nov 27 12:21:56 2015 +0000

----------------------------------------------------------------------
 .../activemq/transport/vm/VMTransport.java      |   1 +
 .../org/apache/activemq/ActiveMQConnection.java |   8 -
 .../activemq/ActiveMQMessageConsumer.java       |   6 -
 .../org/apache/activemq/ActiveMQSession.java    |   4 -
 .../org/apache/activemq/TransactionContext.java |  76 ++---
 .../activemq/transport/FutureResponse.java      |  40 ++-
 .../activemq/transport/ResponseCorrelator.java  |   4 +-
 .../transport/TransportDisposedIOException.java |   4 +
 .../transport/WireFormatNegotiator.java         |  19 +-
 .../transport/failover/FailoverTransport.java   |   2 +-
 .../transport/fanout/FanoutTransport.java       |   2 +-
 .../apache/activemq/util/ThreadPoolUtils.java   |   6 +-
 .../transport/http/HttpClientTransport.java     |   1 +
 .../ActiveMQXAConnectionTxInterruptTest.java    | 275 +++++++++++++++++++
 .../org/apache/activemq/bugs/AMQ3529v2Test.java | 248 +++++++++++++++++
 .../transport/vm/VMTransportThreadSafeTest.java |   2 +-
 16 files changed, 616 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0a12bcb9/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
index 92c9c51..9e13cf9 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
@@ -122,6 +122,7 @@ public class VMTransport implements Transport, Task {
                 }
             }
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
             iioe.initCause(e);
             throw iioe;

http://git-wip-us.apache.org/repos/asf/activemq/blob/0a12bcb9/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index aecece1..3b2833d 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -629,12 +629,7 @@ public class ActiveMQConnection implements Connection, TopicConnection,
QueueCon
      */
     @Override
     public void close() throws JMSException {
-        // Store the interrupted state and clear so that cleanup happens without
-        // leaking connection resources.  Reset in finally to preserve state.
-        boolean interrupted = Thread.interrupted();
-
         try {
-
             // If we were running, lets stop first.
             if (!closed.get() && !transportFailed.get()) {
                 // do not fail if already closed as according to JMS spec we must not
@@ -722,9 +717,6 @@ public class ActiveMQConnection implements Connection, TopicConnection,
QueueCon
             ServiceSupport.dispose(this.transport);
 
             factoryStats.removeConnection(this);
-            if (interrupted) {
-                Thread.currentThread().interrupt();
-            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/0a12bcb9/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index edc383f..a67022b 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -726,17 +726,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
     }
 
     void doClose() throws JMSException {
-        // Store interrupted state and clear so that Transport operations don't
-        // throw InterruptedException and we ensure that resources are cleaned up.
-        boolean interrupted = Thread.interrupted();
         dispose();
         RemoveInfo removeCommand = info.createRemoveCommand();
         LOG.debug("remove: {}, lastDeliveredSequenceId: {}", getConsumerId(), lastDeliveredSequenceId);
         removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
         this.session.asyncSendPacket(removeCommand);
-        if (interrupted) {
-            Thread.currentThread().interrupt();
-        }
     }
 
     void inProgressClearRequired() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/0a12bcb9/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 5c7cc4f..6603a2f 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -660,14 +660,10 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
     }
 
     private void doClose() throws JMSException {
-        boolean interrupted = Thread.interrupted();
         dispose();
         RemoveInfo removeCommand = info.createRemoveCommand();
         removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
         connection.asyncSendPacket(removeCommand);
-        if (interrupted) {
-            Thread.currentThread().interrupt();
-        }
     }
 
     final AtomicInteger clearRequestsCounter = new AtomicInteger(0);

http://git-wip-us.apache.org/repos/asf/activemq/blob/0a12bcb9/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
index 27cb49d..6bd7402 100755
--- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
+++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq;
 
-import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -29,13 +28,11 @@ import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
-import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.DataArrayResponse;
 import org.apache.activemq.command.DataStructure;
 import org.apache.activemq.command.IntegerResponse;
 import org.apache.activemq.command.LocalTransactionId;
-import org.apache.activemq.command.Response;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.XATransactionId;
@@ -330,7 +327,7 @@ public class TransactionContext implements XAResource {
             this.transactionId = null;
             // Notify the listener that the tx was committed back
             try {
-                syncSendPacketWithInterruptionHandling(info);
+                this.connection.syncSendPacket(info);
                 if (localTransactionEventListener != null) {
                     localTransactionEventListener.commitEvent();
                 }
@@ -403,32 +400,36 @@ public class TransactionContext implements XAResource {
             if (!equals(associatedXid, xid)) {
                 throw new XAException(XAException.XAER_PROTO);
             }
-
-            // TODO: we may want to put the xid in a suspended list.
-            try {
-                beforeEnd();
-            } catch (JMSException e) {
-                throw toXAException(e);
-            } finally {
-                setXid(null);
-            }
+            invokeBeforeEnd();
         } else if ((flags & TMSUCCESS) == TMSUCCESS) {
             // set to null if this is the current xid.
             // otherwise this could be an asynchronous success call
             if (equals(associatedXid, xid)) {
-                try {
-                    beforeEnd();
-                } catch (JMSException e) {
-                    throw toXAException(e);
-                } finally {
-                    setXid(null);
-                }
+                invokeBeforeEnd();
             }
         } else {
             throw new XAException(XAException.XAER_INVAL);
         }
     }
 
+    private void invokeBeforeEnd() throws XAException {
+        boolean throwingException = false;
+        try {
+            beforeEnd();
+        } catch (JMSException e) {
+            throwingException = true;
+            throw toXAException(e);
+        } finally {
+            try {
+                setXid(null);
+            } catch (XAException ignoreIfWillMask){
+                if (!throwingException) {
+                    throw ignoreIfWillMask;
+                }
+            }
+        }
+    }
+
     private boolean equals(Xid xid1, Xid xid2) {
         if (xid1 == xid2) {
             return true;
@@ -465,7 +466,7 @@ public class TransactionContext implements XAResource {
             TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
 
             // Find out if the server wants to commit or rollback.
-            IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
+            IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info);
             if (XAResource.XA_RDONLY == response.getResult()) {
                 // transaction stops now, may be syncs that need a callback
                 List<TransactionContext> l;
@@ -534,7 +535,7 @@ public class TransactionContext implements XAResource {
 
             // Let the server know that the tx is rollback.
             TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
-            syncSendPacketWithInterruptionHandling(info);
+            this.connection.syncSendPacket(info);
 
             List<TransactionContext> l;
             synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
@@ -581,7 +582,7 @@ public class TransactionContext implements XAResource {
             // Notify the server that the tx was committed back
             TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE
: TransactionInfo.COMMIT_TWO_PHASE);
 
-            syncSendPacketWithInterruptionHandling(info);
+            this.connection.syncSendPacket(info);
 
             List<TransactionContext> l;
             synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
@@ -643,7 +644,7 @@ public class TransactionContext implements XAResource {
 
         try {
             // Tell the server to forget the transaction.
-            syncSendPacketWithInterruptionHandling(info);
+            this.connection.syncSendPacket(info);
         } catch (JMSException e) {
             throw toXAException(e);
         }
@@ -741,7 +742,7 @@ public class TransactionContext implements XAResource {
             if (transactionId != null) {
                 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId,
TransactionInfo.END);
                 try {
-                    syncSendPacketWithInterruptionHandling(info);
+                    this.connection.syncSendPacket(info);
                     LOG.debug("{} ended XA transaction {}", this, transactionId);
                 } catch (JMSException e) {
                     disassociate();
@@ -774,31 +775,6 @@ public class TransactionContext implements XAResource {
     }
 
     /**
-     * Sends the given command. Also sends the command in case of interruption,
-     * so that important commands like rollback and commit are never interrupted.
-     * If interruption occurred, set the interruption state of the current
-     * after performing the action again.
-     *
-     * @return the response
-     */
-    private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException
{
-        try {
-            return this.connection.syncSendPacket(command);
-        } catch (JMSException e) {
-            if (e.getLinkedException() instanceof InterruptedIOException) {
-                try {
-                    Thread.interrupted();
-                    return this.connection.syncSendPacket(command);
-                } finally {
-                    Thread.currentThread().interrupt();
-                }
-            }
-
-            throw e;
-        }
-    }
-
-    /**
      * Converts a JMSException from the server to an XAException. if the
      * JMSException contained a linked XAException that is returned instead.
      *

http://git-wip-us.apache.org/repos/asf/activemq/blob/0a12bcb9/activemq-client/src/main/java/org/apache/activemq/transport/FutureResponse.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/FutureResponse.java
b/activemq-client/src/main/java/org/apache/activemq/transport/FutureResponse.java
index ba4bd67..ff95869 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/FutureResponse.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/FutureResponse.java
@@ -29,25 +29,51 @@ public class FutureResponse {
     private static final Logger LOG = LoggerFactory.getLogger(FutureResponse.class);
 
     private final ResponseCallback responseCallback;
+    private final TransportFilter transportFilter;
+
     private final ArrayBlockingQueue<Response> responseSlot = new ArrayBlockingQueue<Response>(1);
 
     public FutureResponse(ResponseCallback responseCallback) {
+        this(responseCallback, null);
+    }
+
+    public FutureResponse(ResponseCallback responseCallback, TransportFilter transportFilter)
{
         this.responseCallback = responseCallback;
+        this.transportFilter = transportFilter;
     }
 
     public Response getResult() throws IOException {
+        boolean hasInterruptPending = Thread.interrupted();
         try {
             return responseSlot.take();
         } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Operation interupted: " + e, e);
+            hasInterruptPending = false;
+            throw dealWithInterrupt(e);
+        } finally {
+            if (hasInterruptPending) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    private InterruptedIOException dealWithInterrupt(InterruptedException e) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Operation interrupted: " + e, e);
+        }
+        InterruptedIOException interruptedIOException = new InterruptedIOException(e.getMessage());
+        interruptedIOException.initCause(e);
+        try {
+            if (transportFilter != null) {
+                transportFilter.onException(interruptedIOException);
             }
-            throw new InterruptedIOException("Interrupted.");
+        } finally {
+            Thread.currentThread().interrupt();
         }
+        return interruptedIOException;
     }
 
     public Response getResult(int timeout) throws IOException {
+        final boolean wasInterrupted = Thread.interrupted();
         try {
             Response result = responseSlot.poll(timeout, TimeUnit.MILLISECONDS);
             if (result == null && timeout > 0) {
@@ -55,7 +81,11 @@ public class FutureResponse {
             }
             return result;
         } catch (InterruptedException e) {
-            throw new InterruptedIOException("Interrupted.");
+            throw dealWithInterrupt(e);
+        } finally {
+            if (wasInterrupted) {
+                Thread.currentThread().interrupt();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/0a12bcb9/activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
b/activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
index eca76a7..ad18ea6 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
@@ -64,7 +64,7 @@ public class ResponseCorrelator extends TransportFilter {
         Command command = (Command) o;
         command.setCommandId(sequenceGenerator.getNextSequenceId());
         command.setResponseRequired(true);
-        FutureResponse future = new FutureResponse(responseCallback);
+        FutureResponse future = new FutureResponse(responseCallback, this);
         IOException priorError = null;
         synchronized (requestMap) {
             priorError = this.error;
@@ -122,7 +122,7 @@ public class ResponseCorrelator extends TransportFilter {
      * any of current requests. Lets let them know of the problem.
      */
     public void onException(IOException error) {
-        dispose(error);
+        dispose(new TransportDisposedIOException("Disposed due to prior exception", error));
         super.onException(error);
     }
     

http://git-wip-us.apache.org/repos/asf/activemq/blob/0a12bcb9/activemq-client/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
b/activemq-client/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
index 632fc05..1d57777 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
@@ -38,4 +38,8 @@ public class TransportDisposedIOException extends IOException {
         super(message);
     }
 
+    public TransportDisposedIOException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/0a12bcb9/activemq-client/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
b/activemq-client/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
index c86c6ed..fe3b179 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
@@ -93,13 +93,25 @@ public class WireFormatNegotiator extends TransportFilter {
     }
 
     public void oneway(Object command) throws IOException {
+        boolean wasInterrupted = Thread.interrupted();
         try {
-            if (!readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) {
+            if (readyCountDownLatch.getCount() > 0 && !readyCountDownLatch.await(negotiateTimeout,
TimeUnit.MILLISECONDS)) {
                 throw new IOException("Wire format negotiation timeout: peer did not send
his wire format.");
             }
         } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new InterruptedIOException();
+            InterruptedIOException interruptedIOException = new InterruptedIOException("Interrupted
waiting for wire format negotiation");
+            interruptedIOException.initCause(e);
+            try {
+                onException(interruptedIOException);
+            } finally {
+                Thread.currentThread().interrupt();
+                wasInterrupted = false;
+            }
+            throw interruptedIOException;
+        }  finally {
+            if (wasInterrupted) {
+                Thread.currentThread().interrupt();
+            }
         }
         super.oneway(command);
     }
@@ -143,6 +155,7 @@ public class WireFormatNegotiator extends TransportFilter {
         } catch (IOException e) {
             onException(e);
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             onException((IOException)new InterruptedIOException().initCause(e));
         } catch (Exception e) {
             onException(IOExceptionSupport.create(e));

http://git-wip-us.apache.org/repos/asf/activemq/blob/0a12bcb9/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
index 0f36d67..7f7d7c6 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
@@ -130,7 +130,7 @@ public class FailoverTransport implements CompositeTransport {
     private String nestedExtraQueryOptions;
     private boolean shuttingDown = false;
 
-    public FailoverTransport() throws InterruptedIOException {
+    public FailoverTransport() {
         brokerSslContext = SslContext.getCurrentSslContext();
         stateTracker.setTrackTransactions(true);
         // Setup a task that is used to reconnect the a connection async.

http://git-wip-us.apache.org/repos/asf/activemq/blob/0a12bcb9/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
b/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
index 0d933e5..00ae7ae 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
@@ -157,7 +157,7 @@ public class FanoutTransport implements CompositeTransport {
         }
     }
 
-    public FanoutTransport() throws InterruptedIOException {
+    public FanoutTransport() {
         // Setup a task that is used to reconnect the a connection async.
         reconnectTaskFactory = new TaskRunnerFactory();
         reconnectTaskFactory.init();

http://git-wip-us.apache.org/repos/asf/activemq/blob/0a12bcb9/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java b/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
index 1a3dc34..27b69fc 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
@@ -124,7 +124,11 @@ public final class ThreadPoolUtils {
                     warned = true;
                     LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.",
executorService);
                     // we were interrupted during shutdown, so force shutdown
-                    executorService.shutdownNow();
+                    try {
+                        executorService.shutdownNow();
+                    } finally {
+                        Thread.currentThread().interrupt();
+                    }
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/0a12bcb9/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
index 4715d02..c65dbb9 100755
--- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
@@ -184,6 +184,7 @@ public class HttpClientTransport extends HttpTransportSupport {
                             Thread.sleep(1000);
                         } catch (InterruptedException e) {
                             onException(new InterruptedIOException());
+                            Thread.currentThread().interrupt();
                             break;
                         }
                     } else {

http://git-wip-us.apache.org/repos/asf/activemq/blob/0a12bcb9/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionTxInterruptTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionTxInterruptTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionTxInterruptTest.java
new file mode 100644
index 0000000..a297121
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionTxInterruptTest.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+import javax.jms.XASession;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.MutableBrokerFilter;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.transaction.Synchronization;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.*;
+
+public class ActiveMQXAConnectionTxInterruptTest {
+    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQXAConnectionTxInterruptTest.class);
+    long txGenerator = System.currentTimeMillis();
+    private BrokerService broker;
+    XASession session;
+    XAResource resource;
+    ActiveMQXAConnection xaConnection;
+    Destination dest;
+
+    @Before
+    public void startBrokerEtc() throws Exception {
+        broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/BRXA"));
+        broker.setPersistent(false);
+        broker.start();
+        ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("failover:(" +
broker.getTransportConnectors().get(0).getConnectUri() + ")");
+        cf1.setStatsEnabled(true);
+        xaConnection = (ActiveMQXAConnection)cf1.createConnection();
+        xaConnection.start();
+        session = xaConnection.createXASession();
+        resource = session.getXAResource();
+
+        dest = new ActiveMQQueue("Q");
+
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        try {
+            xaConnection.close();
+        } catch (Throwable ignore) {
+        }
+        try {
+            broker.stop();
+        } catch (Throwable ignore) {
+        }
+    }
+
+
+    @Test
+    public void testRollbackAckInterrupted() throws Exception {
+
+        // publish a message
+        publishAMessage();
+        Xid tid;
+
+        // consume in tx and rollback with interrupt
+        session = xaConnection.createXASession();
+        final MessageConsumer consumer = session.createConsumer(dest);
+        tid = createXid();
+        resource = session.getXAResource();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        ((TransactionContext)resource).addSynchronization(new Synchronization() {
+            @Override
+            public void beforeEnd() throws Exception {
+                LOG.info("Interrupting thread: " + Thread.currentThread(), new Throwable("Source"));
+                Thread.currentThread().interrupt();
+            }
+        });
+        TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+        assertNotNull(receivedMessage);
+        assertEquals(getName(), receivedMessage.getText());
+        resource.end(tid, XAResource.TMFAIL);
+        resource.rollback(tid);
+        session.close();
+        assertTrue("Was interrupted", Thread.currentThread().isInterrupted());
+    }
+
+    @Test
+    public void testCommitAckInterrupted() throws Exception {
+
+        // publish a message
+        publishAMessage();
+
+        // consume in tx and rollback with interrupt
+        session = xaConnection.createXASession();
+        MessageConsumer consumer = session.createConsumer(dest);
+        Xid tid = createXid();
+        resource = session.getXAResource();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        ((TransactionContext)resource).addSynchronization(new Synchronization() {
+            @Override
+            public void beforeEnd() throws Exception {
+                LOG.info("Interrupting thread: " + Thread.currentThread(), new Throwable("Source"));
+                Thread.currentThread().interrupt();
+            }
+        });
+        TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+        assertNotNull(receivedMessage);
+        assertEquals(getName(), receivedMessage.getText());
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);
+        session.close();
+
+    }
+
+    @Test
+    public void testInterruptWhilePendingResponseToAck() throws Exception {
+
+        final LinkedList<Throwable> errors = new LinkedList<Throwable>();
+        final CountDownLatch blockedServerSize = new CountDownLatch(1);
+        final CountDownLatch canContinue = new CountDownLatch(1);
+        MutableBrokerFilter filter = (MutableBrokerFilter)broker.getBroker().getAdaptor(MutableBrokerFilter.class);
+        filter.setNext(new MutableBrokerFilter(filter.getNext()) {
+            @Override
+            public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack)
throws Exception {
+                blockedServerSize.countDown();
+                canContinue.await();
+                super.acknowledge(consumerExchange, ack);
+            }
+        });
+
+        publishAMessage();
+
+        // consume in tx and rollback with interrupt while pending reply
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    session = xaConnection.createXASession();
+                    MessageConsumer consumer = session.createConsumer(dest);
+                    Xid tid = createXid();
+                    resource = session.getXAResource();
+                    resource.start(tid, XAResource.TMNOFLAGS);
+
+                    TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+                    assertNotNull(receivedMessage);
+                    assertEquals(getName(), receivedMessage.getText());
+
+                    try {
+                        resource.end(tid, XAResource.TMSUCCESS);
+                        fail("Expect end to fail");
+                    } catch (Throwable expectedWithInterrupt) {
+                        assertTrue(expectedWithInterrupt instanceof XAException);
+                        assertCause(expectedWithInterrupt, new Class[]{InterruptedException.class});
+                    }
+
+                    try {
+                        resource.rollback(tid);
+                        fail("Expect rollback to fail due to connection being closed");
+                    } catch (Throwable expectedWithInterrupt) {
+                        assertTrue(expectedWithInterrupt instanceof XAException);
+                        assertCause(expectedWithInterrupt, new Class[]{ConnectionClosedException.class,
InterruptedException.class});
+                    }
+                    session.close();
+
+                    assertTrue("Was interrupted", Thread.currentThread().isInterrupted());
+
+                } catch (Throwable error) {
+                    error.printStackTrace();
+                    errors.add(error);
+                }
+            }
+        });
+
+        assertTrue("got to blocking call", blockedServerSize.await(20, TimeUnit.SECONDS));
+
+        // will interrupt
+        executorService.shutdownNow();
+        canContinue.countDown();
+
+        assertTrue("job done", executorService.awaitTermination(20, TimeUnit.SECONDS));
+
+        assertTrue("no errors: " + errors, errors.isEmpty());
+    }
+
+    private void assertCause(Throwable expectedWithInterrupt, Class[] exceptionClazzes) {
+        Throwable candidate = expectedWithInterrupt;
+
+        while (candidate != null) {
+            for (Class<?> exceptionClazz: exceptionClazzes) {
+                if (exceptionClazz.isInstance(candidate)) {
+                    return;
+                }
+            }
+            candidate = candidate.getCause();
+        }
+        LOG.error("ex", expectedWithInterrupt);
+        fail("no expected type as cause:" + expectedWithInterrupt);
+    }
+
+    public Xid createXid() throws IOException {
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream os = new DataOutputStream(baos);
+        os.writeLong(++txGenerator);
+        os.close();
+        final byte[] bs = baos.toByteArray();
+
+        return new Xid() {
+            public int getFormatId() {
+                return 87;
+            }
+
+            public byte[] getGlobalTransactionId() {
+                return bs;
+            }
+
+            public byte[] getBranchQualifier() {
+                return bs;
+            }
+        };
+
+    }
+
+    private void publishAMessage() throws IOException, XAException, JMSException {
+        Xid tid = createXid();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        MessageProducer producer = session.createProducer(dest);
+        ActiveMQTextMessage message  = new ActiveMQTextMessage();
+        message.setText(getName());
+        producer.send(message);
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);
+        session.close();
+    }
+
+
+    private String getName() {
+        return this.getClass().getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/0a12bcb9/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529v2Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529v2Test.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529v2Test.java
new file mode 100644
index 0000000..030f2b4
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529v2Test.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.activemq.bugs;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class AMQ3529v2Test {
+
+    private static Logger LOG = LoggerFactory.getLogger(AMQ3529v2Test.class);
+
+    private BrokerService broker;
+    private String connectionUri;
+
+    @Before
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://0.0.0.0:0");
+        broker.start();
+        broker.waitUntilStarted();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    @Test(timeout = 60000)
+    public void testRandomInterruptionAffects() throws Exception {
+        doTestRandomInterruptionAffects();
+    }
+
+    @Test(timeout = 60000)
+    public void testRandomInterruptionAffectsWithFailover() throws Exception {
+        connectionUri = "failover:(" + connectionUri + ")";
+        doTestRandomInterruptionAffects();
+    }
+
+    public void doTestRandomInterruptionAffects() throws Exception {
+        final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+
+        ThreadGroup tg = new ThreadGroup("tg");
+
+        assertEquals(0, tg.activeCount());
+
+        class ClientThread extends Thread {
+
+            public Exception error;
+
+            public ClientThread(ThreadGroup tg, String name) {
+                super(tg, name);
+            }
+
+            @Override
+            public void run() {
+                Context ctx = null;
+                Connection connection = null;
+                Session session = null;
+                MessageConsumer consumer = null;
+
+                try {
+                    connection = connectionFactory.createConnection();
+                    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    assertNotNull(session);
+
+                    Properties props = new Properties();
+                    props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+                    props.setProperty(Context.PROVIDER_URL, connectionUri);
+                    ctx = null;
+                    try {
+                        ctx = new InitialContext(props);
+                    } catch (NoClassDefFoundError e) {
+                        throw new NamingException(e.toString());
+                    } catch (Exception e) {
+                        throw new NamingException(e.toString());
+                    }
+                    Destination destination = (Destination) ctx.lookup("dynamicTopics/example.C");
+                    consumer = session.createConsumer(destination);
+                    consumer.receive(10000);
+                } catch (Exception e) {
+                    // Expect an exception here from the interrupt.
+                } finally {
+                    try {
+                        if (consumer != null) {
+                            consumer.close();
+                        }
+                    } catch (JMSException e) {
+                        trackException("Consumer Close failed with", e);
+                    }
+                    try {
+                        if (session != null) {
+                            session.close();
+                        }
+                    } catch (JMSException e) {
+                        trackException("Session Close failed with", e);
+                    }
+                    try {
+                        if (connection != null) {
+                            connection.close();
+                        }
+                    } catch (JMSException e) {
+                        trackException("Connection Close failed with", e);
+                    }
+                    try {
+                        if (ctx != null) {
+                            ctx.close();
+                        }
+                    } catch (Exception e) {
+                        trackException("Connection Close failed with", e);
+                    }
+                }
+            }
+
+            private void trackException(String s, Exception e) {
+                LOG.error(s, e);
+                this.error = e;
+            }
+        }
+
+        final Random random = new Random();
+        List<ClientThread> threads = new LinkedList<ClientThread>();
+        for (int i=0;i<10;i++) {
+            threads.add(new ClientThread(tg, "Client-"+ i));
+        }
+        for (Thread thread : threads) {
+            thread.start();
+        }
+        // interrupt the threads at some random time
+        ExecutorService doTheInterrupts = Executors.newFixedThreadPool(threads.size());
+        for (final Thread thread : threads) {
+            doTheInterrupts.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        Thread.sleep(random.nextInt(5000));
+                    } catch (InterruptedException ignored) {
+                        ignored.printStackTrace();
+                    }
+                    thread.interrupt();
+                }
+            });
+        }
+        doTheInterrupts.shutdown();
+        assertTrue("all interrupts done", doTheInterrupts.awaitTermination(30, TimeUnit.SECONDS));
+
+        for (Thread thread : threads) {
+            thread.join();
+        }
+
+        for (ClientThread thread : threads) {
+            if (thread.error != null) {
+                LOG.info("Close error on thread: " + thread, thread.error);
+            }
+        }
+
+        Thread[] remainThreads = new Thread[tg.activeCount()];
+        tg.enumerate(remainThreads);
+        for (final Thread t : remainThreads) {
+            if (t != null && t.isAlive() && !t.isDaemon())
+                assertTrue("Thread completes:" + t, Wait.waitFor(new Wait.Condition() {
+                    @Override
+                    public boolean isSatisified() throws Exception {
+                        LOG.info("Remaining thread: " + t.toString());
+                        return !t.isAlive();
+                    }
+                }));
+        }
+
+        ThreadGroup root = Thread.currentThread().getThreadGroup().getParent();
+        while (root.getParent() != null) {
+            root = root.getParent();
+        }
+        visit(root, 0);
+    }
+
+    // This method recursively visits all thread groups under `group'.
+    public static void visit(ThreadGroup group, int level) {
+        // Get threads in `group'
+        int numThreads = group.activeCount();
+        Thread[] threads = new Thread[numThreads * 2];
+        numThreads = group.enumerate(threads, false);
+
+        // Enumerate each thread in `group'
+        for (int i = 0; i < numThreads; i++) {
+            // Get thread
+            Thread thread = threads[i];
+            LOG.debug("Thread:" + thread.getName() + " is still running");
+        }
+
+        // Get thread subgroups of `group'
+        int numGroups = group.activeGroupCount();
+        ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
+        numGroups = group.enumerate(groups, false);
+
+        // Recursively visit each subgroup
+        for (int i = 0; i < numGroups; i++) {
+            visit(groups[i], level + 1);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/0a12bcb9/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
index c5c4706..eccbf1b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
@@ -318,7 +318,7 @@ public class VMTransportThreadSafeTest {
         // simulate broker stop
         remote.stop();
 
-        assertTrue(Wait.waitFor(new Wait.Condition() {
+        assertTrue("got expected exception response", Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
                 LOG.info("answer: " + answer[0]);


Mime
View raw message