activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1027451 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java
Date Tue, 26 Oct 2010 10:32:31 GMT
Author: dejanb
Date: Tue Oct 26 10:32:31 2010
New Revision: 1027451

URL: http://svn.apache.org/viewvc?rev=1027451&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2982 - sticky kahadb log files on rollback

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java?rev=1027451&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java Tue
Oct 26 10:32:31 2010
@@ -0,0 +1,166 @@
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ2982Test {
+
+    private static final int MAX_MESSAGES = 500;
+
+    private static final String QUEUE_NAME = "test.queue";
+
+    private BrokerService broker;
+
+    private final CountDownLatch messageCountDown = new CountDownLatch(MAX_MESSAGES);
+
+    private CleanableKahaDBStore kahaDB;
+
+    private static class CleanableKahaDBStore extends KahaDBStore {
+        // make checkpoint cleanup accessible
+        public void forceCleanup() throws IOException {
+            checkpointCleanup(true);
+        }
+
+        public int getFileMapSize() throws IOException {
+            // ensure save memory publishing, use the right lock
+            indexLock.readLock().lock();
+            try {
+                return getJournal().getFileMap().size();
+            } finally {
+                indexLock.readLock().unlock();
+            }
+        }
+    }
+
+    @Before
+    public void setup() throws Exception {
+
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(true);
+
+        kahaDB = new CleanableKahaDBStore();
+        kahaDB.setJournalMaxFileLength(256 * 1024);
+        broker.setPersistenceAdapter(kahaDB);
+
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    private Connection registerDLQMessageListener() throws Exception {
+        ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(session
+                .createQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
+        consumer.setMessageListener(new MessageListener() {
+
+            public void onMessage(Message message) {
+                messageCountDown.countDown();
+            }
+        });
+
+        return connection;
+    }
+
+    class ConsumerThread extends Thread {
+
+        @Override
+        public void run() {
+            try {
+                ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+
+                RedeliveryPolicy policy = new RedeliveryPolicy();
+                policy.setMaximumRedeliveries(0);
+                policy.setInitialRedeliveryDelay(100);
+                policy.setUseExponentialBackOff(false);
+
+                factory.setRedeliveryPolicy(policy);
+
+                Connection connection = factory.createConnection();
+                connection.start();
+                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+                MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
+                do {
+                    Message message = consumer.receive(300);
+                    if (message != null) {
+                        session.rollback();
+                    }
+                } while (messageCountDown.getCount() != 0);
+                consumer.close();
+                session.close();
+                connection.close();
+            } catch (Exception e) {
+                Assert.fail(e.getMessage());
+            }
+        }
+    }
+
+    private void sendMessages() throws Exception {
+        ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        for (int i = 0; i < MAX_MESSAGES; i++) {
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(new byte[1000]);
+            producer.send(message);
+        }
+        producer.close();
+        session.close();
+        connection.close();
+    }
+
+    @Test
+    public void testNoStickyKahaDbLogFilesOnLocalTransactionRollback() throws Exception {
+
+        Connection dlqConnection = registerDLQMessageListener();
+
+        ConsumerThread thread = new ConsumerThread();
+        thread.start();
+
+        sendMessages();
+
+        thread.join(60 * 1000);
+        assertFalse(thread.isAlive());
+
+        dlqConnection.close();
+
+        kahaDB.forceCleanup();
+
+        assertEquals("only one active KahaDB log file after cleanup is expected", 1, kahaDB.getFileMapSize());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+}



Mime
View raw message