Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0C5C2176A7 for ; Thu, 12 Feb 2015 23:22:21 +0000 (UTC) Received: (qmail 74598 invoked by uid 500); 12 Feb 2015 23:22:21 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 74555 invoked by uid 500); 12 Feb 2015 23:22:21 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 74544 invoked by uid 99); 12 Feb 2015 23:22:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Feb 2015 23:22:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CB6E5E03EB; Thu, 12 Feb 2015 23:22:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hadrian@apache.org To: commits@activemq.apache.org Date: Thu, 12 Feb 2015 23:22:21 -0000 Message-Id: <0bec762ea2cb4b2db14574179b8e2588@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/6] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5542 fix (via revert below) and test case applied with thanks. https://issues.apache.org/jira/browse/AMQ-5542 fix (via revert below) and test case applied with thanks. Revert "resolve https://issues.apache.org/activemq/browse/AMQ-2736, logic issue in code that keeps data files with acks around pending message file gc. thanks jgenender - test case to follow" This reverts commit dd68c61e65f24b7dc498b36e34960a4bc46ded4b. resolves: https://issues.apache.org/jira/browse/AMQ-5542 and applies test case that nicely demonstrates the defect, thanks Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e828dc79 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e828dc79 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e828dc79 Branch: refs/heads/activemq-5.11.x Commit: e828dc791f59691d2d0ba2ce169082e4c1984139 Parents: b997bd4 Author: gtully Authored: Fri Jan 30 14:45:26 2015 +0000 Committer: Hadrian Zbarcea Committed: Thu Feb 12 18:18:29 2015 -0500 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 3 +- .../org/apache/activemq/bugs/AMQ2832Test.java | 51 ++++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/e828dc79/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 35a59ce..477f42c 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1672,14 +1672,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if (LOG.isTraceEnabled()) { LOG.trace("gc candidates: " + gcCandidateSet); } - final TreeSet gcCandidates = new TreeSet(gcCandidateSet); Iterator candidates = gcCandidateSet.iterator(); while (candidates.hasNext()) { Integer candidate = candidates.next(); Set referencedFileIds = metadata.ackMessageFileMap.get(candidate); if (referencedFileIds != null) { for (Integer referencedFileId : referencedFileIds) { - if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) { + if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) { // active file that is not targeted for deletion is referenced so don't delete candidates.remove(); break; http://git-wip-us.apache.org/repos/asf/activemq/blob/e828dc79/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java index 319fcc2..22ad6ab 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java @@ -17,6 +17,7 @@ package org.apache.activemq.bugs; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -117,7 +118,57 @@ public class AMQ2832Test { } } + /** + * Scenario: + * db-1.log has an unacknowledged message, + * db-2.log contains acks for the messages from db-1.log, + * db-3.log contains acks for the messages from db-2.log + * + * Expected behavior: since db-1.log is blocked, db-2.log and db-3.log should not be removed during the cleanup. + * Current situation on 5.10.0, 5.10.1 is that db-3.log is removed causing all messages from db-2.log, whose acks were in db-3.log, to be replayed. + * + * @throws Exception + */ @Test + public void testAckChain() throws Exception { + startBroker(); + + StagedConsumer consumer = new StagedConsumer(); + // file #1 + produceMessagesToConsumeMultipleDataFiles(5); + // acknowledge first 2 messages and leave the 3rd one unacknowledged blocking db-1.log + consumer.receive(3); + + // send messages by consuming and acknowledging every message right after sent in order to get KahadbAdd and Remove command to be saved together + // this is necessary in order to get KahaAddMessageCommand to be saved in one db file and the corresponding KahaRemoveMessageCommand in the next one + produceAndConsumeImmediately(20, consumer); + consumer.receive(2).acknowledge(); // consume and ack the last 2 unconsumed + + // now we have 3 files written and started with #4 + consumer.close(); + + broker.stop(); + broker.waitUntilStopped(); + + recoverBroker(); + + consumer = new StagedConsumer(); + Message message = consumer.receive(1); + assertNotNull("One message stays unacked from db-1.log", message); + message.acknowledge(); + message = consumer.receive(1); + assertNull("There should not be any unconsumed messages any more", message); + consumer.close(); + } + + private void produceAndConsumeImmediately(int numOfMsgs, StagedConsumer consumer) throws Exception { + for (int i = 0; i < numOfMsgs; i++) { + produceMessagesToConsumeMultipleDataFiles(1); + consumer.receive(1).acknowledge(); + } + } + + @Test public void testAckRemovedMessageReplayedAfterRecovery() throws Exception { startBroker();