activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r697976 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/index/ main/java/org/apache/kahadb/journal/ main/java/org/apache/kahadb/store/ test/java/org/apache/kahadb/store/perf/
Date Mon, 22 Sep 2008 20:27:25 GMT
Author: chirino
Date: Mon Sep 22 13:27:24 2008
New Revision: 697976

URL: http://svn.apache.org/viewvc?rev=697976&view=rev
Log:
Fixed issue in message recovery where the indexes were inconsisent due to a message getting
assinged 2 sequence ids.
added some improved toString() methods to aid durring debugging
The Btree was not properly linking leaf nodes together on a split in some situations.


Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java?rev=697976&r1=697975&r2=697976&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java Mon Sep 22
13:27:24 2008
@@ -420,6 +420,7 @@
             } else {
                 rNode.setLeafData(rightKeys, rightValues);
                 lNode.setLeafData(leftKeys, leftValues);
+                lNode.setNext(rNode.getPageId());
             }
 
             Key[] v = createKeyArray(1);
@@ -550,7 +551,7 @@
     public Map.Entry<Key,Value> getLast(Transaction tx) throws IOException {
         BTreeNode<Key, Value> node = this;
         while( node.isBranch() ) {
-            node = node.getChild(tx, children.length-1);
+            node = node.getChild(tx, node.children.length-1);
         }
         if( node.values.length>0 ) {
             int idx = node.values.length-1;

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=697976&r1=697975&r2=697976&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Mon Sep 22
13:27:24 2008
@@ -375,10 +375,13 @@
         dataFile.unlink();
         if (archiveDataLogs) {
             dataFile.move(getDirectoryArchive());
-            LOG.info("moved data file " + dataFile + " to " + getDirectoryArchive());
+            LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive());
         } else {
-            boolean result = dataFile.delete();
-            LOG.info("discarding data file " + dataFile + (result ? "successful " : "failed"));
+            if ( dataFile.delete() ) {
+            	LOG.debug("Discarded data file " + dataFile);
+            } else {
+            	LOG.warn("Failed to discard data file " + dataFile.getFile());
+            }
         }
     }
 

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java?rev=697976&r1=697975&r2=697976&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java Mon Sep
22 13:27:24 2008
@@ -104,9 +104,7 @@
     }
 
     public String toString() {
-        String result = "offset = " + offset + ", file = " + dataFileId + ", size = " + size
+ ", type = "
-                        + type;
-        return result;
+        return dataFileId+":"+offset;
     }
 
     public void writeExternal(DataOutput dos) throws IOException {

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=697976&r1=697975&r2=697976&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Mon
Sep 22 13:27:24 2008
@@ -30,6 +30,7 @@
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.TreeMap;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -150,8 +151,8 @@
     protected File directory;
     protected boolean recovering;
     protected Thread checkpointThread;
-    protected boolean syncWrites;
-    int checkpointInterval = 1000;
+    protected boolean syncWrites=true;
+    int checkpointInterval = 5*1000;
     int cleanupInterval = 30*1000;
     
     protected AtomicBoolean started = new AtomicBoolean();
@@ -590,21 +591,28 @@
 
         // Add the message.
         long id = sd.nextMessageId++;
-        sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
-        sd.locationIndex.put(tx, location, id);
-        sd.messageIdIndex.put(tx, command.getMessageId(), id);
+        Long previous = sd.locationIndex.put(tx, location, id);
+        if( previous == null ) {
+            sd.messageIdIndex.put(tx, command.getMessageId(), id);
+            sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
+        } else {
+            // restore the previous value.. Looks like this was a redo of a previously
+            // added message.  We don't want to assing it a new id as the other indexes would

+            // be wrong..
+            sd.locationIndex.put(tx, location, previous);
+        }
+        
     }
 
     private void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation)
throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
         if (!command.hasSubscriptionKey()) {
+            
             // In the queue case we just remove the message from the index..
             Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
             if (sequenceId != null) {
                 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
-                if( keys!=null ) {
-                    sd.locationIndex.remove(tx, keys.location);
-                }
+                sd.locationIndex.remove(tx, keys.location);
             }
         } else {
             // In the topic case we need remove the message once it's been acked
@@ -698,6 +706,7 @@
             // Find empty journal files to remove.
             final HashSet<Integer> inUseFiles = new HashSet<Integer>();
             for (StoredDestination sd : storedDestinations.values()) {
+                
                 // Use a visitor to cut down the number of pages that we load
                 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
                     int last=-1;
@@ -739,7 +748,6 @@
         LOG.debug("Checkpoint done.");
     }
 
-
     // /////////////////////////////////////////////////////////////////
     // StoredDestination related implementation methods.
     // /////////////////////////////////////////////////////////////////
@@ -761,6 +769,11 @@
             this.messageId=messageId;
             this.location=location;
         }
+        
+        @Override
+        public String toString() {
+            return "["+messageId+","+location+"]";
+        }
     }
     
     static protected class MessageKeysMarshaller implements Marshaller<MessageKeys>
{

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java?rev=697976&r1=697975&r2=697976&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java
(original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java
Mon Sep 22 13:27:24 2008
@@ -19,10 +19,13 @@
 import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.BytesMessage;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -48,15 +51,14 @@
 public class KahaBulkLoadingTest extends JmsTestSupport {
 
     private static final Log LOG = LogFactory.getLog(KahaBulkLoadingTest.class);
-    
-    protected int messageSize = 1024 * 64;
-    protected int produceCount = 10000;
+
+    protected int messageSize = 1024 * 4;
 
     protected BrokerService createBroker() throws Exception {
         BrokerService broker = new BrokerService();
         KahaDBPersistenceAdaptor kaha = new KahaDBPersistenceAdaptor();
         kaha.setDirectory(new File("target/activemq-data/kahadb"));
-        kaha.deleteAllMessages();
+        // kaha.deleteAllMessages();
         broker.setPersistenceAdapter(kaha);
         broker.addConnector("tcp://localhost:0");
         return broker;
@@ -69,40 +71,75 @@
     }
 
     public void testQueueSendThenAddConsumer() throws Exception {
-        ProgressPrinter printer = new ProgressPrinter(produceCount, 20);
-
+        long start;
+        long end;
         ActiveMQDestination destination = new ActiveMQQueue("TEST");
 
         connection.setUseCompression(false);
         connection.getPrefetchPolicy().setAll(10);
         connection.start();
-        Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(destination);
-        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
 
-        LOG.info("Sending " + produceCount + " messages that are " + (messageSize / 1024.0)
+ "k large, for a total of " + (produceCount * messageSize / (1024.0 * 1024.0))
-                 + " megs of data.");
-        // Send a message to the broker.
-        long start = System.currentTimeMillis();
-        for (int i = 0; i < produceCount; i++) {
-            printer.increment();
-            BytesMessage msg = session.createBytesMessage();
-            msg.writeBytes(new byte[messageSize]);
-            producer.send(msg);
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        LOG.info("Receiving messages that are in the queue");
+        MessageConsumer consumer = session.createConsumer(destination);
+        BytesMessage msg = (BytesMessage)consumer.receive(2000);
+        int consumed = 0;
+        if( msg!=null ) {
+            consumed++;
         }
-        long end1 = System.currentTimeMillis();
+        while (true) {
+            int counter = 0;
+            if (msg == null) {
+                break;
+            }
+            end = start = System.currentTimeMillis();
+            int size = 0;
+            while ((end - start) < 5000) {
+                msg = (BytesMessage)consumer.receive(5000);
+                if (msg == null) {
+                    break;
+                }
+                counter++;
+                consumed++;
+                end = System.currentTimeMillis();
+                size += msg.getBodyLength();
+            }
+            LOG.info("Consumed: " + (counter * 1000.0 / (end - start)) + " " + " messages/sec,
" + (1.0 * size / (1024.0 * 1024.0)) * ((1000.0 / (end - start))) + " megs/sec ");
+        }
+        consumer.close();
+        LOG.info("Consumed " + consumed + " messages from the queue.");
 
-        LOG.info("Produced messages/sec: " + (produceCount * 1000.0 / (end1 - start)));
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
 
-        printer = new ProgressPrinter(produceCount, 10);
+        LOG.info("Sending messages that are " + (messageSize / 1024.0) + "k large");
+        // Send a message to the broker.
         start = System.currentTimeMillis();
-        MessageConsumer consumer = session.createConsumer(destination);
-        for (int i = 0; i < produceCount; i++) {
-            printer.increment();
-            assertNotNull("Getting message: " + i, consumer.receive(20000));
+
+        final AtomicBoolean stop = new AtomicBoolean();
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                stop.set(true);
+            }
+        });
+
+        int produced = 0;
+        while (!stop.get()) {
+            end = start = System.currentTimeMillis();
+            int produceCount = 0;
+            while ((end - start) < 5000 && !stop.get()) {
+                BytesMessage bm = session.createBytesMessage();
+                bm.writeBytes(new byte[messageSize]);
+                producer.send(bm);
+                produceCount++;
+                produced++;
+                end = System.currentTimeMillis();
+            }
+            LOG.info("Produced: " + (produceCount * 1000.0 / (end - start)) + " messages/sec,
" + (1.0 * produceCount * messageSize / (1024.0 * 1024.0)) * ((1000.0 / (end - start))) +
" megs/sec");
         }
-        end1 = System.currentTimeMillis();
-        LOG.info("Consumed messages/sec: " + (produceCount * 1000.0 / (end1 - start)));
+        LOG.info("Prodcued " + produced + " messages to the queue.");
 
     }
 



Mime
View raw message