activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r759276 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/store/kahadb/ test/java/org/apache/activemq/broker/store/ test/java/org/apache/activemq/broker/store/kahadb/
Date Fri, 27 Mar 2009 17:32:23 GMT
Author: chirino
Date: Fri Mar 27 17:32:23 2009
New Revision: 759276

URL: http://svn.apache.org/viewvc?rev=759276&view=rev
Log:
Adding a small perf benchmark for Stores.

Added:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java?rev=759276&r1=759275&r2=759276&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
Fri Mar 27 17:32:23 2009
@@ -21,12 +21,12 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage;
 import org.apache.kahadb.index.BTreeIndex;
-import org.apache.kahadb.index.BTreeVisitor;
 import org.apache.kahadb.page.Transaction;
 import org.apache.kahadb.util.LongMarshaller;
 import org.apache.kahadb.util.Marshaller;
@@ -103,21 +103,21 @@
 
     public Iterator<QueueRecord> listMessages(Transaction tx, Long firstQueueKey, final
int max) throws IOException {
         final ArrayList<QueueRecord> rc = new ArrayList<QueueRecord>(max);
-        queueIndex.visit(tx, new BTreeVisitor.GTEVisitor<Long, QueueRecord>(firstQueueKey)
{
-            @Override
-            public boolean isInterestedInKeysBetween(Long first, Long second) {
-                if (rc.size() >= max)
-                    return false;
-                return super.isInterestedInKeysBetween(first, second);
-            }
-
-            @Override
-            protected void matched(Long key, QueueRecord value) {
-                if (rc.size() >= max)
-                    return;
-                rc.add(value);
+        
+        Iterator<Entry<Long, QueueRecord>> iterator;
+        if( firstQueueKey!=null ) {
+            iterator = queueIndex.iterator(tx, firstQueueKey);
+        } else {
+            iterator = queueIndex.iterator(tx);
+        }
+        while (iterator.hasNext()) {
+            if( rc.size() >= max ) {
+                break;
             }
-        });
+            Map.Entry<Long, QueueRecord> entry = iterator.next();
+            rc.add(entry.getValue());
+        }
+        
         return rc.iterator();
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=759276&r1=759275&r2=759276&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
Fri Mar 27 17:32:23 2009
@@ -748,7 +748,6 @@
                 if( tx!=null ) {
                     tx.commit();
                 }
-                store(updates);
             } catch (IOException e) {
                 throw new FatalStoreException(e);
             } finally {
@@ -757,6 +756,12 @@
                     tx=null;
                 }
             }
+            
+            try {
+                store(updates);
+            } catch (IOException e) {
+                throw new FatalStoreException(e);
+            }
         }
         
         ///////////////////////////////////////////////////////////////
@@ -827,9 +832,11 @@
             Long queueKey = destination.nextQueueKey();
             QueueAddMessageBean bean = new QueueAddMessageBean();
             bean.setQueueName(queueName);
-            bean.setAttachment(record.getAttachment());
-            bean.setMessageKey(record.getMessageKey());
             bean.setQueueKey(queueKey);
+            bean.setMessageKey(record.getMessageKey());
+            if( record.getAttachment()!=null ) {
+                bean.setAttachment(record.getAttachment());
+            }
             updates.add(bean);
             return queueKey;
         }
@@ -845,7 +852,7 @@
                 throw new KeyNotFoundException("queue key: "+queueName);
             }
             try {
-                return destination.listMessages(tx, firstQueueKey, max);
+                return destination.listMessages(tx(), firstQueueKey, max);
             } catch (IOException e) {
                 throw new FatalStoreException(e);
             }

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java?rev=759276&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
Fri Mar 27 17:32:23 2009
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.store.Store.Callback;
+import org.apache.activemq.broker.store.Store.MessageRecord;
+import org.apache.activemq.broker.store.Store.QueueRecord;
+import org.apache.activemq.broker.store.Store.Session;
+import org.apache.activemq.broker.store.Store.VoidCallback;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.metric.Period;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
+
+public abstract class StorePerformanceBase extends TestCase {
+
+    private static final int PERFORMANCE_SAMPLES = 30000;
+    private Store store;
+    private AsciiBuffer queueName;
+
+    protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate
Producer Rate").unit("items");
+    protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate
Consumer Rate").unit("items");
+
+    abstract protected Store createStore();
+    abstract protected boolean isStoreTransactional();
+    abstract protected boolean isStorePersistent();
+
+    @Override
+    protected void setUp() throws Exception {
+        store = createStore();
+        store.start();
+        
+        queueName = new AsciiBuffer("test");
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                session.queueAdd(queueName);
+            }
+        }, null);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        if (store != null) {
+            store.stop();
+        }
+    }
+
+    private final Object wakeupMutex = new Object(); 
+    
+    class Producer implements Runnable {
+        private Thread thread;
+        private AtomicBoolean stopped = new AtomicBoolean();
+        private String name;
+        protected final MetricCounter rate = new MetricCounter();
+        private long sleep;
+
+        public Producer(String name) {
+            this.name=name;
+        }
+        public void start() {
+            rate.name("Producer " + name + " Rate");
+            totalProducerRate.add(rate);
+            thread = new Thread(this, "Producer"+ name);
+            thread.start();
+        }
+        public void stop() throws InterruptedException {
+            stopped.set(true);
+            thread.join();
+        }
+        public void run() {
+            try {
+                Buffer buffer = new Buffer(new byte[1024]);
+                for( long i=0; !stopped.get(); i++ ) {
+                    
+                    final MessageRecord messageRecord = new MessageRecord();
+                    messageRecord.setMessageId(new AsciiBuffer(""+i));
+                    messageRecord.setEncoding(new AsciiBuffer("encoding"));
+                    messageRecord.setBuffer(buffer);
+
+                    store.execute(new VoidCallback<Exception>() {
+                        @Override
+                        public void run(Session session) throws Exception {
+                            Long messageKey = session.messageAdd(messageRecord);
+                            QueueRecord queueRecord = new Store.QueueRecord();
+                            queueRecord.setMessageKey(messageKey);
+                            session.queueAddMessage(queueName, queueRecord);
+                        }
+                    }, null);
+                    rate.increment();
+                    synchronized(wakeupMutex){
+                        wakeupMutex.notify();
+                    }
+                    
+                    if( sleep>0 ) {
+                        Thread.sleep(sleep);
+                    }
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+    
+    class Consumer implements Runnable {
+        private Thread thread;
+        private AtomicBoolean stopped = new AtomicBoolean();
+        protected final MetricCounter rate = new MetricCounter();
+        private String name;
+
+        public Consumer(String name) {
+            this.name=name;
+        }
+        public void start() {
+            rate.name("Consumer " + name + " Rate");
+            totalConsumerRate.add(rate);
+            thread = new Thread(this, "Consumer " + name );
+            thread.start();
+        }
+        public void stop() throws InterruptedException {
+            stopped.set(true);
+            thread.join();
+        }
+        
+        public void run() {
+            try {
+                while( !stopped.get() ) {
+                    ArrayList<MessageRecord> records = store.execute(new Callback<ArrayList<MessageRecord>,
Exception>() {
+                        public ArrayList<MessageRecord> execute(Session session) throws
Exception {
+                            ArrayList<MessageRecord> rc = new ArrayList<MessageRecord>(1000);
+                            Iterator<QueueRecord> queueRecords = session.queueListMessagesQueue(queueName,
null, 1000);
+                            for (Iterator<QueueRecord> iterator = queueRecords; iterator.hasNext();)
{
+                                QueueRecord r = iterator.next();
+                                rc.add(session.messageGetRecord(r.getMessageKey()));
+                                session.queueRemoveMessage(queueName, r.queueKey);
+                            }
+                            return rc;
+                        }
+                    }, null);
+                    rate.increment(records.size());
+                    if( records.isEmpty() ) {
+                        synchronized(wakeupMutex){
+                            wakeupMutex.wait(500);
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+    
+    public void test1_1_1() throws Exception {
+
+        Producer p = new  Producer("1");
+        Consumer c = new  Consumer("1");
+        p.start();
+        c.start();
+        
+        reportRates();
+        
+        p.stop();
+        c.stop();
+        
+    }
+    
+    private void reportRates() throws InterruptedException {
+        System.out.println("Checking rates for test: " + getName());
+        for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+            Period p = new Period();
+            Thread.sleep(1000 * 5);
+            System.out.println(totalProducerRate.getRateSummary(p));
+            System.out.println(totalConsumerRate.getRateSummary(p));
+            totalProducerRate.reset();
+            totalConsumerRate.reset();
+        }
+    }
+    
+}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java?rev=759276&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java
Fri Mar 27 17:32:23 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.kahadb;
+
+import java.io.File;
+
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.StorePerformanceBase;
+
+public class KahaDBStorePerformance extends StorePerformanceBase {
+
+    @Override
+    protected Store createStore() {
+        KahaDBStore rc = new KahaDBStore();
+        rc.setDirectory(new File("target/test-data/kahadb-store-performance"));
+        rc.setDeleteAllMessages(true);
+        return rc;
+    }
+
+    @Override
+    protected boolean isStorePersistent() {
+        return true;
+    }
+
+    @Override
+    protected boolean isStoreTransactional() {
+        return true;
+    }
+
+}



Mime
View raw message