activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1214888 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/store/kahadb/
Date Thu, 15 Dec 2011 17:44:48 GMT
Author: gtully
Date: Thu Dec 15 17:44:47 2011
New Revision: 1214888

URL: http://svn.apache.org/viewvc?rev=1214888&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3637 - NullPointerException while loading node from
kahadb during vmcursor replay. Batch recovery such that expiry can be processed periodically
which avoids a nested kahadb transaction that modifies. additional test and some additional
logging when recovery is taking a long time

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryExpiryTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1214888&r1=1214887&r2=1214888&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Thu Dec 15 17:44:47 2011
@@ -2224,7 +2224,7 @@ public class BrokerService implements Se
         if (ioExceptionHandler != null) {
             ioExceptionHandler.handle(exception);
          } else {
-            LOG.info("Ignoring IO exception, " + exception, exception);
+            LOG.info("No IOExceptionHandler registered, ignoring IO exception, " + exception,
exception);
          }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1214888&r1=1214887&r2=1214888&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Dec 15 17:44:47 2011
@@ -17,7 +17,6 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
-import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -25,6 +24,7 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -231,6 +231,79 @@ public class Queue extends BaseDestinati
         }
     }
 
+    class BatchMessageRecoveryListener implements MessageRecoveryListener {
+        final LinkedList<Message> toExpire = new LinkedList<Message>();
+        final double totalMessageCount;
+        int recoveredAccumulator = 0;
+        int currentBatchCount;
+
+        BatchMessageRecoveryListener(int totalMessageCount) {
+            this.totalMessageCount = totalMessageCount;
+            currentBatchCount = recoveredAccumulator;
+        }
+
+        public boolean recoverMessage(Message message) {
+            recoveredAccumulator++;
+            if (LOG.isInfoEnabled() && (recoveredAccumulator % 10000) == 0) {
+                LOG.info("cursor for " + getActiveMQDestination().getQualifiedName() + "
has recovered "
+                        + recoveredAccumulator + " messages. " +
+                        (int) (recoveredAccumulator * 100 / totalMessageCount) + "% complete");
+            }
+            // Message could have expired while it was being
+            // loaded..
+            if (message.isExpired() && broker.isExpired(message)) {
+                toExpire.add(message);
+                return true;
+            }
+            if (hasSpace()) {
+                message.setRegionDestination(Queue.this);
+                messagesLock.writeLock().lock();
+                try {
+                    try {
+                        messages.addMessageLast(message);
+                    } catch (Exception e) {
+                        LOG.error("Failed to add message to cursor", e);
+                    }
+                } finally {
+                    messagesLock.writeLock().unlock();
+                }
+                destinationStatistics.getMessages().increment();
+                return true;
+            }
+            return false;
+        }
+
+        public boolean recoverMessageReference(MessageId messageReference) throws Exception
{
+            throw new RuntimeException("Should not be called.");
+        }
+
+        public boolean hasSpace() {
+            return true;
+        }
+
+        public boolean isDuplicate(MessageId id) {
+            return false;
+        }
+
+        public void reset() {
+            currentBatchCount = recoveredAccumulator;
+        }
+
+        public void processExpired() {
+            for (Message message: toExpire) {
+                messageExpired(createConnectionContext(), createMessageReference(message));
+                // drop message will decrement so counter
+                // balance here
+                destinationStatistics.getMessages().increment();
+            }
+            toExpire.clear();
+        }
+
+        public boolean done() {
+            return currentBatchCount == recoveredAccumulator;
+        }
+    }
+
     @Override
     public void initialize() throws Exception {
         if (this.messages == null) {
@@ -263,60 +336,15 @@ public class Queue extends BaseDestinati
             messages.setMaxProducersToAudit(getMaxProducersToAudit());
             messages.setUseCache(isUseCache());
             messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
+            final int messageCount = store.getMessageCount();
             if (messages.isRecoveryRequired()) {
-                store.recover(new MessageRecoveryListener() {
-                    double totalMessageCount = store.getMessageCount();
-                    int recoveredMessageCount = 0;
-
-                    public boolean recoverMessage(Message message) {
-                        // Message could have expired while it was being
-                        // loaded..
-                        if ((++recoveredMessageCount % 50000) == 0) {
-                            LOG.info("cursor for " + getActiveMQDestination().getQualifiedName()
+ " has recovered "
-                                    + recoveredMessageCount + " messages. " +
-                                    (int)(recoveredMessageCount*100/totalMessageCount) +
"% complete");
-                        }
-                        if (message.isExpired()) {
-                            if (broker.isExpired(message)) {
-                                messageExpired(createConnectionContext(), createMessageReference(message));
-                                // drop message will decrement so counter
-                                // balance here
-                                destinationStatistics.getMessages().increment();
-                            }
-                            return true;
-                        }
-                        if (hasSpace()) {
-                            message.setRegionDestination(Queue.this);
-                            messagesLock.writeLock().lock();
-                            try{
-                                try {
-                                    messages.addMessageLast(message);
-                                } catch (Exception e) {
-                                    LOG.error("Failed to add message to cursor", e);
-                                }
-                            }finally {
-                                messagesLock.writeLock().unlock();
-                            }
-                            destinationStatistics.getMessages().increment();
-                            return true;
-                        }
-                        return false;
-                    }
-
-                    public boolean recoverMessageReference(MessageId messageReference) throws
Exception {
-                        throw new RuntimeException("Should not be called.");
-                    }
-
-                    public boolean hasSpace() {
-                        return true;
-                    }
-
-                    public boolean isDuplicate(MessageId id) {
-                        return false;
-                    }
-                });
+                BatchMessageRecoveryListener listener = new BatchMessageRecoveryListener(messageCount);
+                do {
+                   listener.reset();
+                   store.recoverNextMessages(getMaxPageSize(), listener);
+                   listener.processExpired();
+               } while (!listener.done());
             } else {
-                int messageCount = store.getMessageCount();
                 destinationStatistics.getMessages().setCount(messageCount);
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1214888&r1=1214887&r2=1214888&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Thu Dec 15 17:44:47 2011
@@ -470,6 +470,9 @@ public abstract class MessageDatabase ex
                     process(message, recoveryPosition, lastIndoubtPosition);
                     redoCounter++;
                     recoveryPosition = journal.getNextLocation(recoveryPosition);
+                     if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
+                         LOG.info("@" + recoveryPosition +  ", "  + redoCounter + " entries
recovered ..");
+                     }
                 }
                 if (LOG.isInfoEnabled()) {
                     long end = System.currentTimeMillis();

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryExpiryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryExpiryTest.java?rev=1214888&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryExpiryTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryExpiryTest.java
Thu Dec 15 17:44:47 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.util.concurrent.TimeUnit;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.BaseDestination;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertNull;
+
+public class KahaDBStoreRecoveryExpiryTest {
+
+    private BrokerService broker;
+    private ActiveMQConnection connection;
+    private Destination destination = new ActiveMQQueue("Test");
+    private Session session;
+
+    @Test
+    public void testRestartWitExpired() throws Exception  {
+        publishMessages(1, 0);
+        publishMessages(1, 2000);
+        publishMessages(1, 0);
+        restartBroker(3000);
+        consumeMessages(2);
+    }
+
+    @Test
+    public void testRestartWitExpiredLargerThanBatchRecovery() throws Exception  {
+        publishMessages(BaseDestination.MAX_PAGE_SIZE + 10, 2000);
+        publishMessages(10, 0);
+        restartBroker(3000);
+        consumeMessages(10);
+    }
+
+    private void consumeMessages(int count) throws Exception {
+        MessageConsumer consumer = session.createConsumer(destination);
+        for (int i=0; i<count; i++) {
+            assertNotNull("got message "+ i, consumer.receive(4000));
+        }
+        assertNull("none left over", consumer.receive(2000));
+    }
+
+    private void restartBroker(int restartDelay) throws Exception {
+        stopBroker();
+        TimeUnit.MILLISECONDS.sleep(restartDelay);
+        startBroker();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    private void publishMessages(int count, int expiry) throws Exception {
+        MessageProducer producer = session.createProducer(destination);
+        for (int i=0; i<count; i++) {
+            producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 5, expiry);
+        }
+    }
+
+    @Before
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setIndexCacheSize(0);
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+        broker.setUseJmx(false);
+        broker.start();
+
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.setWatchTopicAdvisories(false);
+        connection.start();
+
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryExpiryTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryExpiryTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message