Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 36679 invoked from network); 1 Apr 2010 23:25:47 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 1 Apr 2010 23:25:47 -0000 Received: (qmail 82669 invoked by uid 500); 1 Apr 2010 23:25:47 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 82629 invoked by uid 500); 1 Apr 2010 23:25:47 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 82622 invoked by uid 99); 1 Apr 2010 23:25:47 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Apr 2010 23:25:47 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Apr 2010 23:25:42 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 15199238897A; Thu, 1 Apr 2010 23:25:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r930135 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/state/ main/java/org/apache/activemq/transport/failover/ test/java/org/apache/activemq/ Date: Thu, 01 Apr 2010 23:25:20 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100401232521.15199238897A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Thu Apr 1 23:25:20 2010 New Revision: 930135 URL: http://svn.apache.org/viewvc?rev=930135&view=rev Log: resolve https://issues.apache.org/activemq/browse/AMQ-2556 - resolve leaks with XA_RDONLY - prepare needs to cleanup the transaction state, both on the broker and on the client connection/session/failover state Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ResponseHandler.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=930135&r1=930134&r2=930135&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java Thu Apr 1 23:25:20 2010 @@ -426,6 +426,18 @@ public class TransactionContext implemen // Find out if the server wants to commit or rollback. IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info); + if (XAResource.XA_RDONLY == response.getResult()) { + // transaction stops now, may be syncs that need a callback + List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); + if (l != null && !l.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: " + xid); + } + for (TransactionContext ctx : l) { + ctx.afterCommit(); + } + } + } return response.getResult(); } catch (JMSException e) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=930135&r1=930134&r2=930135&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Thu Apr 1 23:25:20 2010 @@ -31,6 +31,9 @@ import java.util.concurrent.atomic.Atomi import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.transaction.xa.XAResource; + import org.apache.activemq.broker.ft.MasterBroker; import org.apache.activemq.broker.region.ConnectionStatistics; import org.apache.activemq.broker.region.RegionBroker; @@ -389,7 +392,7 @@ public class TransportConnection impleme } TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); if (transactionState == null) { - throw new IllegalStateException("Cannot prepare a transaction that had not been started: " + throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: " + info.getTransactionId()); } // Avoid dups. @@ -397,6 +400,10 @@ public class TransportConnection impleme transactionState.setPrepared(true); int result = broker.prepareTransaction(context, info.getTransactionId()); transactionState.setPreparedResult(result); + if (result == XAResource.XA_RDONLY) { + // we are done, no further rollback or commit from TM + cs.removeTransactionState(info.getTransactionId()); + } IntegerResponse response = new IntegerResponse(result); return response; } else { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=930135&r1=930134&r2=930135&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Thu Apr 1 23:25:20 2010 @@ -25,6 +25,7 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import javax.jms.TransactionRolledBackException; +import javax.transaction.xa.XAResource; import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionId; @@ -34,6 +35,7 @@ import org.apache.activemq.command.Consu import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.ExceptionResponse; +import org.apache.activemq.command.IntegerResponse; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; @@ -79,19 +81,34 @@ public class ConnectionStateTracker exte } }; - private class RemoveTransactionAction implements Runnable { + private class RemoveTransactionAction implements ResponseHandler { private final TransactionInfo info; public RemoveTransactionAction(TransactionInfo info) { this.info = info; } - public void run() { + public void onResponse(Command response) { ConnectionId connectionId = info.getConnectionId(); ConnectionState cs = connectionStates.get(connectionId); cs.removeTransactionState(info.getTransactionId()); } } + + private class PrepareReadonlyTransactionAction extends RemoveTransactionAction { + + public PrepareReadonlyTransactionAction(TransactionInfo info) { + super(info); + } + + public void onResponse(Command command) { + IntegerResponse response = (IntegerResponse) command; + if (XAResource.XA_RDONLY == response.getResult()) { + // all done, no commit or rollback from TM + super.onResponse(command); + } + } + } /** * @@ -469,10 +486,10 @@ public class ConnectionStateTracker exte TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); if (transactionState != null) { transactionState.addCommand(info); + return new Tracked(new PrepareReadonlyTransactionAction(info)); } } } - return TRACKED_RESPONSE_MARKER; } return null; } Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ResponseHandler.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ResponseHandler.java?rev=930135&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ResponseHandler.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ResponseHandler.java Thu Apr 1 23:25:20 2010 @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.state; + +import org.apache.activemq.command.Command; + +public interface ResponseHandler { + public void onResponse(Command command); +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ResponseHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ResponseHandler.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java?rev=930135&r1=930134&r2=930135&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java Thu Apr 1 23:25:20 2010 @@ -16,25 +16,26 @@ */ package org.apache.activemq.state; +import org.apache.activemq.command.Command; import org.apache.activemq.command.Response; public class Tracked extends Response { - private Runnable runnable; + private ResponseHandler handler; - public Tracked(Runnable runnable) { - this.runnable = runnable; + public Tracked(ResponseHandler runnable) { + this.handler = runnable; } - public void onResponses() { - if (runnable != null) { - runnable.run(); - runnable = null; + public void onResponses(Command command) { + if (handler != null) { + handler.onResponse(command); + handler = null; } } public boolean isWaitingForResponse() { - return runnable != null; + return handler != null; } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=930135&r1=930134&r2=930135&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Apr 1 23:25:20 2010 @@ -155,7 +155,7 @@ public class FailoverTransport implement object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId())); } if (object != null && object.getClass() == Tracked.class) { - ((Tracked) object).onResponses(); + ((Tracked) object).onResponses(command); } } if (!initialized) { @@ -1011,6 +1011,10 @@ public class FailoverTransport implement } } + public ConnectionStateTracker getStateTracker() { + return stateTracker; + } + private boolean contains(URI newURI) { boolean result = false; Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java?rev=930135&r1=930134&r2=930135&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java Thu Apr 1 23:25:20 2010 @@ -21,6 +21,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.concurrent.CopyOnWriteArrayList; import javax.jms.Connection; import javax.jms.Destination; @@ -37,11 +38,20 @@ import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerRegistry; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransactionBroker; +import org.apache.activemq.broker.TransportConnection; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.TransactionInfo; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.management.JMSConnectionStatsImpl; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.failover.FailoverTransport; import org.apache.activemq.transport.stomp.StompTransportFilter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -248,7 +258,89 @@ public class ActiveMQXAConnectionFactory resource.commit(tid, true); } - + + public void testReadonlyNoLeak() throws Exception { + final String brokerName = "readOnlyNoLeak"; + BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName)); + broker.setPersistent(false); + broker.start(); + ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getConnectUri() + ")"); + cf1.setStatsEnabled(true); + ActiveMQXAConnection xaConnection = (ActiveMQXAConnection)cf1.createConnection(); + xaConnection.start(); + XASession session = xaConnection.createXASession(); + XAResource resource = session.getXAResource(); + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + session.close(); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + + assertTransactionGoneFromBroker(tid); + assertTransactionGoneFromConnection(brokerName, xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid); + assertSessionGone(xaConnection, session); + assertTransactionGoneFromFailoverState(xaConnection, tid); + + // two phase + session = xaConnection.createXASession(); + resource = session.getXAResource(); + tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + session.close(); + resource.end(tid, XAResource.TMSUCCESS); + assertEquals(XAResource.XA_RDONLY, resource.prepare(tid)); + + // no need for a commit on read only + assertTransactionGoneFromBroker(tid); + assertTransactionGoneFromConnection(brokerName, xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid); + assertSessionGone(xaConnection, session); + assertTransactionGoneFromFailoverState(xaConnection, tid); + + xaConnection.close(); + broker.stop(); + + } + + private void assertTransactionGoneFromFailoverState( + ActiveMQXAConnection connection1, Xid tid) throws Exception { + + FailoverTransport transport = (FailoverTransport) connection1.getTransport().narrow(FailoverTransport.class); + TransactionInfo info = new TransactionInfo(connection1.getConnectionInfo().getConnectionId(), new XATransactionId(tid), TransactionInfo.COMMIT_ONE_PHASE); + assertNull("transaction shold not exist in the state tracker", + transport.getStateTracker().processCommitTransactionOnePhase(info)); + } + + private void assertSessionGone(ActiveMQXAConnection connection1, + XASession session) { + JMSConnectionStatsImpl stats = (JMSConnectionStatsImpl)connection1.getStats(); + // should be no dangling sessions maintained by the transaction + assertEquals("should be no sessions", 0, stats.getSessions().length); + } + + private void assertTransactionGoneFromConnection(String brokerName, String clientId, ConnectionId connectionId, Xid tid) throws Exception { + BrokerService broker = BrokerRegistry.getInstance().lookup(brokerName); + CopyOnWriteArrayList connections = broker.getTransportConnectors().get(0).getConnections(); + for (TransportConnection connection: connections) { + if (connection.getConnectionId().equals(clientId)) { + try { + connection.processPrepareTransaction(new TransactionInfo(connectionId, new XATransactionId(tid), TransactionInfo.PREPARE)); + fail("did not get expected excepton on missing transaction, it must be still there in error!"); + } catch (IllegalStateException expectedOnNoTransaction) { + } + } + } + } + + private void assertTransactionGoneFromBroker(Xid tid) throws Exception { + BrokerService broker = BrokerRegistry.getInstance().lookup("localhost"); + TransactionBroker transactionBroker = (TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class); + try { + transactionBroker.getTransaction(null, new XATransactionId(tid), false); + fail("expecte ex on tx not found"); + } catch (XAException expectedOnNotFound) { + } + } + protected void assertCreateConnection(String uri) throws Exception { // Start up a broker with a tcp connector. BrokerService broker = new BrokerService();