Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5799DE575 for ; Mon, 21 Jan 2013 11:37:47 +0000 (UTC) Received: (qmail 85751 invoked by uid 500); 21 Jan 2013 11:37:47 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 85612 invoked by uid 500); 21 Jan 2013 11:37:45 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 85595 invoked by uid 99); 21 Jan 2013 11:37:44 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Jan 2013 11:37:44 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Jan 2013 11:37:42 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C9BFF23889BB; Mon, 21 Jan 2013 11:37:23 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1436291 - in /activemq/trunk: activemq-core/src/test/java/org/apache/activemq/bugs/ activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/ Date: Mon, 21 Jan 2013 11:37:23 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130121113723.C9BFF23889BB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Mon Jan 21 11:37:23 2013 New Revision: 1436291 URL: http://svn.apache.org/viewvc?rev=1436291&view=rev Log: https://issues.apache.org/jira/browse/AMQ-4172 - resolve with test. inflight transactions need to just protect a data file range rather than all subsequent data files. so gc can reclaim what is valid Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java (with props) Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java?rev=1436291&r1=1436290&r2=1436291&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java Mon Jan 21 11:37:23 2013 @@ -54,7 +54,7 @@ public class AMQ2736Test { KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); KahaDBStore store = pa.getStore(); - assertNotNull("last tx location is present " + store.getFirstInProgressTxLocation()); + assertNotNull("last tx location is present " + store.getInProgressTxLocationRange()[1]); // test hack, close the journal to ensure no further journal updates when broker stops // mimic kill -9 in terms of no normal shutdown sequence @@ -74,7 +74,7 @@ public class AMQ2736Test { store = pa.getStore(); // inflight non xa tx should be rolledback on recovery - assertNull("in progress tx location is present ", store.getFirstInProgressTxLocation()); + assertNull("in progress tx location is present ", store.getInProgressTxLocationRange()[0]); } Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java?rev=1436291&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java Mon Jan 21 11:37:23 2013 @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertTrue; + +public class TransactedStoreUsageSuspendResumeTest { + private static final Logger LOG = LoggerFactory.getLogger(TransactedStoreUsageSuspendResumeTest.class); + + private static final int MAX_MESSAGES = 10000; + + private static final String QUEUE_NAME = "test.queue"; + + private BrokerService broker; + + private final CountDownLatch messagesReceivedCountDown = new CountDownLatch(MAX_MESSAGES); + private final CountDownLatch messagesSentCountDown = new CountDownLatch(MAX_MESSAGES); + private final CountDownLatch consumerStartLatch = new CountDownLatch(1); + + private class ConsumerThread extends Thread { + + @Override + public void run() { + try { + + consumerStartLatch.await(30, TimeUnit.SECONDS); + + ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + // wait for producer to stop + long currentSendCount; + do { + currentSendCount = messagesSentCountDown.getCount(); + TimeUnit.SECONDS.sleep(5); + } while (currentSendCount != messagesSentCountDown.getCount()); + + LOG.info("Starting consumer at: " + currentSendCount); + + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); + + do { + Message message = consumer.receive(1000); + if (message != null) { + session.commit(); + messagesReceivedCountDown.countDown(); + } + if (messagesReceivedCountDown.getCount() % 500 == 0) { + LOG.info("remaining to receive: " + messagesReceivedCountDown.getCount()); + } + } while (messagesReceivedCountDown.getCount() != 0); + consumer.close(); + session.close(); + connection.close(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + } + + @Before + public void setup() throws Exception { + + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(true); + + KahaDBPersistenceAdapter kahaDB = new KahaDBPersistenceAdapter(); + kahaDB.setJournalMaxFileLength(500 * 1024); + kahaDB.setCleanupInterval(10*1000); + broker.setPersistenceAdapter(kahaDB); + + broker.getSystemUsage().getStoreUsage().setLimit(7*1024*1024); + + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + } + + @Test + public void testTransactedStoreUsageSuspendResume() throws Exception { + + ConsumerThread thread = new ConsumerThread(); + thread.start(); + ExecutorService sendExecutor = Executors.newSingleThreadExecutor(); + sendExecutor.execute(new Runnable() { + @Override + public void run() { + try { + sendMessages(); + } catch (Exception ignored) { + } + } + }); + sendExecutor.shutdown(); + sendExecutor.awaitTermination(5, TimeUnit.MINUTES); + + boolean allMessagesReceived = messagesReceivedCountDown.await(120, TimeUnit.SECONDS); + assertTrue("Got all messages: " + messagesReceivedCountDown, allMessagesReceived); + } + + private void sendMessages() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + factory.setAlwaysSyncSend(true); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination queue = session.createQueue(QUEUE_NAME); + Destination retainQueue = session.createQueue(QUEUE_NAME + "-retain"); + MessageProducer producer = session.createProducer(null); + + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[10]); + + for (int i=0; i<4240; i++) { + // mostly fill the store with retained messages + // so consumer only has a small bit of store usage to work with + producer.send(retainQueue, message); + session.commit(); + } + + consumerStartLatch.countDown(); + for (int i = 0; i < MAX_MESSAGES; i++) { + producer.send(queue, message); + if (i>0 && i%20 == 0) { + session.commit(); + } + messagesSentCountDown.countDown(); + if (i>0 && i%500 == 0) { + LOG.info("Sent : " + i); + } + + } + session.commit(); + producer.close(); + session.close(); + connection.close(); + } + +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1436291&r1=1436290&r2=1436291&view=diff ============================================================================== --- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original) +++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Mon Jan 21 11:37:23 2013 @@ -29,6 +29,7 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -421,7 +422,7 @@ public abstract class MessageDatabase ex try { if( pageFile != null && pageFile.isLoaded() ) { metadata.state = CLOSED_STATE; - metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); + metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0]; if (metadata.page != null) { pageFile.tx().execute(new Transaction.Closure() { @@ -440,30 +441,36 @@ public abstract class MessageDatabase ex // public for testing @SuppressWarnings("rawtypes") - public Location getFirstInProgressTxLocation() { - Location l = null; + public Location[] getInProgressTxLocationRange() { + Location[] range = new Location[]{null, null}; synchronized (inflightTransactions) { if (!inflightTransactions.isEmpty()) { for (List ops : inflightTransactions.values()) { if (!ops.isEmpty()) { - l = ops.get(0).getLocation(); - break; + trackMaxAndMin(range, ops); } } } if (!preparedTransactions.isEmpty()) { for (List ops : preparedTransactions.values()) { if (!ops.isEmpty()) { - Location t = ops.get(0).getLocation(); - if (l==null || t.compareTo(l) <= 0) { - l = t; - } - break; + trackMaxAndMin(range, ops); } } } } - return l; + return range; + } + + private void trackMaxAndMin(Location[] range, List ops) { + Location t = ops.get(0).getLocation(); + if (range[0]==null || t.compareTo(range[0]) <= 0) { + range[0] = t; + } + t = ops.get(ops.size() -1).getLocation(); + if (range[1]==null || t.compareTo(range[1]) >= 0) { + range[1] = t; + } } class TranInfo { @@ -1385,11 +1392,12 @@ public abstract class MessageDatabase ex LOG.debug("Checkpoint started."); // reflect last update exclusive of current checkpoint - Location firstTxLocation = metadata.lastUpdate; + Location lastUpdate = metadata.lastUpdate; metadata.state = OPEN_STATE; metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); - metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); + Location[] inProgressTxRange = getInProgressTxLocationRange(); + metadata.firstInProgressTransactionLocation = inProgressTxRange[0]; tx.store(metadata.page, metadataMarshaller, true); pageFile.flush(); @@ -1399,7 +1407,11 @@ public abstract class MessageDatabase ex final TreeSet gcCandidateSet = new TreeSet(completeFileSet); if (LOG.isTraceEnabled()) { - LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet); + LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet); + } + + if (lastUpdate != null) { + gcCandidateSet.remove(lastUpdate.getDataFileId()); } // Don't GC files under replication @@ -1411,25 +1423,14 @@ public abstract class MessageDatabase ex gcCandidateSet.remove(metadata.producerSequenceIdTrackerLocation.getDataFileId()); } - // Don't GC files after the first in progress tx - if( metadata.firstInProgressTransactionLocation!=null ) { - if (metadata.firstInProgressTransactionLocation.getDataFileId() < firstTxLocation.getDataFileId()) { - firstTxLocation = metadata.firstInProgressTransactionLocation; + // Don't GC files referenced by in-progress tx + if (inProgressTxRange[0] != null) { + for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) { + gcCandidateSet.remove(pendingTx); } } - - if( firstTxLocation!=null ) { - while( !gcCandidateSet.isEmpty() ) { - Integer last = gcCandidateSet.last(); - if( last >= firstTxLocation.getDataFileId() ) { - gcCandidateSet.remove(last); - } else { - break; - } - } - if (LOG.isTraceEnabled()) { - LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet); - } + if (LOG.isTraceEnabled()) { + LOG.trace("gc candidates after tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet); } // Go through all the destinations to see if any of them can remove GC candidates.