activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6131
Date Mon, 18 Jan 2016 22:45:10 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x 57b65dc8e -> 7d3a71a4d


https://issues.apache.org/jira/browse/AMQ-6131

Fix check for durable sub with no pending messages during checkpoint
cleanup.
(cherry picked from commit 193f6be6878502f3db8563465872a1afd86b7c54)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7d3a71a4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7d3a71a4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7d3a71a4

Branch: refs/heads/activemq-5.13.x
Commit: 7d3a71a4dff336b6f660d7f51661829b325e034b
Parents: 57b65dc
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Jan 18 17:43:18 2016 -0500
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Jan 18 17:45:05 2016 -0500

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |   7 +-
 .../org/apache/activemq/bugs/AMQ6131Test.java   | 290 +++++++++++++++++++
 2 files changed, 295 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7d3a71a4/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 5c0801b..0f23356 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -853,7 +853,8 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
                 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new
Location(id, dataFile.getLength()), new Location(id + 1, 0)));
                 Sequence seq = dataFile.getCorruptedBlocks().getHead();
                 while (seq != null) {
-                    BTreeVisitor.BetweenVisitor visitor = new BTreeVisitor.BetweenVisitor<Location,
Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
+                    BTreeVisitor.BetweenVisitor<Location, Long> visitor =
+                        new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id,
(int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
                     missingPredicates.add(visitor);
                     knownCorruption.add(visitor);
                     seq = seq.getNext();
@@ -1707,7 +1708,9 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
 
                             // When pending is size one that is the next message Id meaning
there
                             // are no pending messages currently.
-                            if (pendingAcks == null || pendingAcks.size() <= 1) {
+                            if (pendingAcks == null || pendingAcks.isEmpty() ||
+                                (pendingAcks.size() == 1 && pendingAcks.getTail().range()
== 1)) {
+
                                 if (LOG.isTraceEnabled()) {
                                     LOG.trace("Found candidate for rewrite: {} from file
{}", entry.getKey(), dataFileId);
                                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/7d3a71a4/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6131Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6131Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6131Test.java
new file mode 100644
index 0000000..2abd33f
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6131Test.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.MessageDatabase;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This class is to show that a durable can lose messages after index deletion.
+ */
+public class AMQ6131Test {
+
+    protected BrokerService broker;
+    protected URI brokerConnectURI;
+
+    @Before
+    public void startBroker() throws Exception {
+        org.apache.log4j.Logger.getLogger(MessageDatabase.class).setLevel(Level.TRACE);
+        setUpBroker(true);
+    }
+
+    protected void setUpBroker(boolean clearDataDir) throws Exception {
+
+        broker = new BrokerService();
+        broker.setPersistent(true);
+        broker.setDeleteAllMessagesOnStartup(clearDataDir);
+
+        // set up a transport
+        TransportConnector connector = broker.addConnector(new TransportConnector());
+        connector.setUri(new URI("tcp://0.0.0.0:0"));
+        connector.setName("tcp");
+
+        broker.start();
+        broker.waitUntilStarted();
+        brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    protected BrokerService getBroker() {
+        return this.broker;
+    }
+
+    public File getPersistentDir() throws IOException {
+        return getBroker().getPersistenceAdapter().getDirectory();
+    }
+
+    @Test(timeout = 300000)
+    public void testDurableWithOnePendingAfterRestartAndIndexRecovery() throws Exception
{
+        final File persistentDir = getPersistentDir();
+
+        broker.getBroker().addDestination(broker.getAdminConnectionContext(), new ActiveMQTopic("durable.sub"),
false);
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.brokerConnectURI);
+        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.setClientID("myId");
+        connection.start();
+        final Session jmsSession = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+
+        TopicSubscriber durable = jmsSession.createDurableSubscriber(new ActiveMQTopic("durable.sub"),
"sub");
+        final MessageProducer producer = jmsSession.createProducer(new ActiveMQTopic("durable.sub"));
+
+        final int original = new ArrayList<File>(FileUtils.listFiles(persistentDir,
new WildcardFileFilter("*.log"), TrueFileFilter.INSTANCE)).size();
+
+        // 100k messages
+        final byte[] data = new byte[100000];
+        final Random random = new Random();
+        random.nextBytes(data);
+
+        // run test with enough messages to create a second journal file
+        final AtomicInteger messageCount = new AtomicInteger();
+        assertTrue("Should have added a journal file", Wait.waitFor(new Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                final ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+                message.setContent(new ByteSequence(data));
+
+                for (int i = 0; i < 100; i++) {
+                    producer.send(message);
+                    messageCount.getAndIncrement();
+                }
+
+                return new ArrayList<File>(FileUtils.listFiles(persistentDir, new WildcardFileFilter("*.log"),
TrueFileFilter.INSTANCE)).size() > original;
+            }
+        }));
+
+        // Consume all but 1 message
+        for (int i = 0; i < messageCount.get() - 1; i++) {
+            durable.receive();
+        }
+
+        durable.close();
+
+        // wait until a journal file has been GC'd after receiving messages
+        assertTrue("Subscription should go inactive", Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker.getAdminView().getInactiveDurableTopicSubscribers().length
== 1;
+            }
+        }));
+
+        // force a GC of unneeded journal files
+        getBroker().getPersistenceAdapter().checkpoint(true);
+
+        // wait until a journal file has been GC'd after receiving messages
+        assertFalse("Should not have garbage collected", Wait.waitFor(new Wait.Condition()
{
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return new ArrayList<File>(FileUtils.listFiles(persistentDir, new WildcardFileFilter("*.log"),
TrueFileFilter.INSTANCE)).size() == original;
+            }
+        }, 5000, 500));
+
+        // stop the broker so we can blow away the index
+        getBroker().stop();
+        getBroker().waitUntilStopped();
+
+        // delete the index so that the durables are gone from the index
+        // The test passes if you take out this delete section
+        for (File index : FileUtils.listFiles(persistentDir, new WildcardFileFilter("db.*"),
TrueFileFilter.INSTANCE)) {
+            FileUtils.deleteQuietly(index);
+        }
+
+        stopBroker();
+        setUpBroker(false);
+
+        assertEquals(1, broker.getAdminView().getInactiveDurableTopicSubscribers().length);
+        assertEquals(0, broker.getAdminView().getDurableTopicSubscribers().length);
+
+        ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory(this.brokerConnectURI);
+        ActiveMQConnection connection2 = (ActiveMQConnection) connectionFactory2.createConnection();
+        connection2.setClientID("myId");
+        connection2.start();
+        final Session jmsSession2 = connection2.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+
+        TopicSubscriber durable2 = jmsSession2.createDurableSubscriber(new ActiveMQTopic("durable.sub"),
"sub");
+
+        assertEquals(0, broker.getAdminView().getInactiveDurableTopicSubscribers().length);
+        assertEquals(1, broker.getAdminView().getDurableTopicSubscribers().length);
+
+        assertNotNull(durable2.receive(5000));
+    }
+
+    @Test(timeout = 300000)
+    public void testDurableWithNoMessageAfterRestartAndIndexRecovery() throws Exception {
+        final File persistentDir = getPersistentDir();
+
+        broker.getBroker().addDestination(broker.getAdminConnectionContext(), new ActiveMQTopic("durable.sub"),
false);
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.brokerConnectURI);
+        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.setClientID("myId");
+        connection.start();
+        final Session jmsSession = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+
+        TopicSubscriber durable = jmsSession.createDurableSubscriber(new ActiveMQTopic("durable.sub"),
"sub");
+        final MessageProducer producer = jmsSession.createProducer(new ActiveMQTopic("durable.sub"));
+
+        final int original = new ArrayList<File>(FileUtils.listFiles(persistentDir,
new WildcardFileFilter("*.log"), TrueFileFilter.INSTANCE)).size();
+
+        // 100k messages
+        final byte[] data = new byte[100000];
+        final Random random = new Random();
+        random.nextBytes(data);
+
+        // run test with enough messages to create a second journal file
+        final AtomicInteger messageCount = new AtomicInteger();
+        assertTrue("Should have added a journal file", Wait.waitFor(new Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                final ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+                message.setContent(new ByteSequence(data));
+
+                for (int i = 0; i < 100; i++) {
+                    producer.send(message);
+                    messageCount.getAndIncrement();
+                }
+
+                return new ArrayList<File>(FileUtils.listFiles(persistentDir, new WildcardFileFilter("*.log"),
TrueFileFilter.INSTANCE)).size() > original;
+            }
+        }));
+
+        // Consume all messages
+        for (int i = 0; i < messageCount.get(); i++) {
+            durable.receive();
+        }
+
+        durable.close();
+
+        assertTrue("Subscription should go inactive", Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker.getAdminView().getInactiveDurableTopicSubscribers().length
== 1;
+            }
+        }));
+
+        // force a GC of unneeded journal files
+        getBroker().getPersistenceAdapter().checkpoint(true);
+
+        // wait until a journal file has been GC'd after receiving messages
+        assertTrue("Should have garbage collected", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return new ArrayList<File>(FileUtils.listFiles(persistentDir, new WildcardFileFilter("*.log"),
TrueFileFilter.INSTANCE)).size() == original;
+            }
+        }));
+
+        // stop the broker so we can blow away the index
+        getBroker().stop();
+        getBroker().waitUntilStopped();
+
+        // delete the index so that the durables are gone from the index
+        // The test passes if you take out this delete section
+        for (File index : FileUtils.listFiles(persistentDir, new WildcardFileFilter("db.*"),
TrueFileFilter.INSTANCE)) {
+            FileUtils.deleteQuietly(index);
+        }
+
+        stopBroker();
+        setUpBroker(false);
+
+        assertEquals(1, broker.getAdminView().getInactiveDurableTopicSubscribers().length);
+        assertEquals(0, broker.getAdminView().getDurableTopicSubscribers().length);
+
+        ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory(this.brokerConnectURI);
+        ActiveMQConnection connection2 = (ActiveMQConnection) connectionFactory2.createConnection();
+        connection2.setClientID("myId");
+        connection2.start();
+        final Session jmsSession2 = connection2.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+
+        TopicSubscriber durable2 = jmsSession2.createDurableSubscriber(new ActiveMQTopic("durable.sub"),
"sub");
+
+        assertEquals(0, broker.getAdminView().getInactiveDurableTopicSubscribers().length);
+        assertEquals(1, broker.getAdminView().getDurableTopicSubscribers().length);
+
+        assertNull(durable2.receive(500));
+    }
+}
\ No newline at end of file


Mime
View raw message