activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r731704 [2/2] - in /activemq/trunk: activemq-core/ activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq-core/src/main/proto/ activemq-core/src/test/java/org/apache/activemq/store/ activemq-core/src/test/java/org/apache/act...
Date Mon, 05 Jan 2009 20:48:39 GMT
Added: activemq/trunk/activemq-core/src/main/proto/journal-data.proto
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/proto/journal-data.proto?rev=731704&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/proto/journal-data.proto (added)
+++ activemq/trunk/activemq-core/src/main/proto/journal-data.proto Mon Jan  5 12:48:38 2009
@@ -0,0 +1,149 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+package org.apache.activemq.store.kahadb.data;
+
+option java_multiple_files = true;
+option java_outer_classname = "JournalData";
+
+enum KahaEntryType {
+  //| option java_create_message="true";
+  KAHA_TRACE_COMMAND = 0;
+  KAHA_ADD_MESSAGE_COMMAND = 1;
+  KAHA_REMOVE_MESSAGE_COMMAND = 2;
+  KAHA_PREPARE_COMMAND = 3;
+  KAHA_COMMIT_COMMAND = 4;
+  KAHA_ROLLBACK_COMMAND = 5;
+  KAHA_REMOVE_DESTINATION_COMMAND = 6;
+  KAHA_SUBSCRIPTION_COMMAND = 7;
+}
+
+message KahaTraceCommand {
+  // We make use of the wonky comment style bellow because the following options
+  // are not valid for protoc, but they are valid for the ActiveMQ proto compiler.
+  // In the ActiveMQ proto compiler, comments terminate with the pipe character: |
+
+  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaTraceCommand>";
+  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+  
+  required string message = 1;
+}
+
+message KahaAddMessageCommand {
+  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaAddMessageCommand>";
+  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+  
+  optional KahaTransactionInfo transaction_info=1;
+  required KahaDestination destination = 2;
+  required string messageId = 3;
+  required bytes message = 4;
+}
+
+message KahaRemoveMessageCommand {
+  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRemoveMessageCommand>";
+  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  optional KahaTransactionInfo transaction_info=1;
+  required KahaDestination destination = 2;
+  required string messageId = 3;
+  optional bytes ack = 4;
+  optional string subscriptionKey = 5;  // Set if it is a topic ack.
+}
+
+message KahaPrepareCommand {
+  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaPrepareCommand>";
+  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  required KahaTransactionInfo transaction_info=1;
+}
+
+message KahaCommitCommand {
+  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaCommitCommand>";
+  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  required KahaTransactionInfo transaction_info=1;
+}
+
+message KahaRollbackCommand {
+  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRollbackCommand>";
+  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  required KahaTransactionInfo transaction_info=1;
+}
+
+message KahaRemoveDestinationCommand {
+  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRemoveDestinationCommand>";
+  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  required KahaDestination destination = 1;
+}
+
+message KahaSubscriptionCommand {
+  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaSubscriptionCommand>";
+  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  required KahaDestination destination = 1;
+  required string subscriptionKey = 2;
+  optional bool retroactive = 3;
+  optional bytes subscriptionInfo = 4;
+}
+
+message KahaDestination {
+  enum DestinationType {
+    QUEUE = 0;
+    TOPIC = 1;
+    TEMP_QUEUE = 2;
+    TEMP_TOPIC = 3;
+  }
+
+  required DestinationType type = 1 [default = QUEUE];
+  required string name = 2;
+}
+
+message KahaTransactionInfo {
+  optional KahaLocalTransactionId local_transaciton_id=1;
+  optional KahaXATransactionId xa_transaciton_id=2;
+  optional KahaLocation previous_entry=3;
+}
+
+message KahaLocalTransactionId {
+  required string connection_id=1;
+  required int64 transaciton_id=1;
+}
+
+message KahaXATransactionId {
+  required int32 format_id = 1;
+  required bytes branch_qualifier = 2;
+  required bytes global_transaction_id = 3;
+}
+
+message KahaLocation {
+  required int32 log_id = 1;  
+  required int32 offset = 2;  
+}
+
+// TODO things to ponder
+// should we move more message fields
+// that are set by the sender (and rarely required by the broker
+// into the Properties object?

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreBrokerTest.java?rev=731704&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreBrokerTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreBrokerTest.java
Mon Jan  5 12:48:38 2009
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+import java.net.URI;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerTest;
+
+/**
+ * Once the wire format is completed we can test against real persistence storage.
+ * 
+ * @version $Revision: 712224 $
+ */
+public class KahaDBStoreBrokerTest extends BrokerTest {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        kaha.deleteAllMessages();
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+    
+    protected BrokerService createRestartedBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+    
+    
+    public static Test suite() {
+        return suite(KahaDBStoreBrokerTest.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java?rev=731704&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
Mon Jan  5 12:48:38 2009
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.RecoveryBrokerTest;
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+
+
+/**
+ * Used to verify that recovery works correctly against 
+ * 
+ * @version $Revision: 712224 $
+ */
+public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        kaha.deleteAllMessages();
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+    
+    protected BrokerService createRestartedBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+    
+    public static Test suite() {
+        return suite(KahaDBStoreRecoveryBrokerTest.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    
+    public void testLargeQueuePersistentMessagesNotLostOnRestart() throws Exception {
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+        
+        ArrayList<String> expected = new ArrayList<String>();
+        
+        int MESSAGE_COUNT = 10000;
+        for(int i=0; i < MESSAGE_COUNT; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+            expected.add(message.getMessageId().toString());
+        }
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+        producerInfo = createProducerInfo(sessionInfo);
+        connection.send(producerInfo);
+
+        for(int i=0; i < MESSAGE_COUNT/2; i++) {
+            Message m = receiveMessage(connection);
+            assertNotNull("Should have received message "+expected.get(0)+" by now!", m);
+            assertEquals(expected.remove(0), m.getMessageId().toString());
+            MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
+            connection.send(ack);
+        }
+        
+        connection.request(closeConnectionInfo(connectionInfo));
+        
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        for(int i=0; i < MESSAGE_COUNT/2; i++) {
+            Message m = receiveMessage(connection);
+            assertNotNull("Should have received message "+expected.get(i)+" by now!", m);
+            assertEquals(expected.get(i), m.getMessageId().toString());
+            MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
+            connection.send(ack);
+            
+            
+        }
+        
+        connection.request(closeConnectionInfo(connectionInfo));
+    }
+}

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreXARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreXARecoveryBrokerTest.java?rev=731704&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreXARecoveryBrokerTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreXARecoveryBrokerTest.java
Mon Jan  5 12:48:38 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+import java.net.URI;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.XARecoveryBrokerTest;
+
+/**
+ * Used to verify that recovery works correctly against 
+ * 
+ * @version $Revision: 712224 $
+ */
+public class KahaDBStoreXARecoveryBrokerTest extends XARecoveryBrokerTest {
+
+    public static Test suite() {
+        return suite(KahaDBStoreXARecoveryBrokerTest.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        kaha.deleteAllMessages();
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+    
+    protected BrokerService createRestartedBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+    
+}

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java?rev=731704&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java
Mon Jan  5 12:48:38 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.kahadb.journal.Location;
+import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
+import org.apache.activemq.store.kahadb.data.KahaDestination;
+import org.apache.activemq.store.kahadb.data.KahaEntryType;
+import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+
+public class PBMesssagesTest extends TestCase {
+
+    public void testKahaAddMessageCommand() throws IOException {
+
+       KahaAddMessageCommand expected = new KahaAddMessageCommand();
+       expected.setDestination(new KahaDestination().setName("Foo").setType(DestinationType.QUEUE));
+       expected.setMessage(new Buffer(new byte[] {1,2,3,4,5,6} ));
+       expected.setMessageId("Hello World");
+       
+       int size = expected.serializedSizeFramed();
+       DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
+       os.writeByte(expected.type().getNumber());
+       expected.writeFramed(os);
+       ByteSequence seq = os.toByteSequence();
+       
+       DataByteArrayInputStream is = new DataByteArrayInputStream(seq);
+       KahaEntryType type = KahaEntryType.valueOf(is.readByte());
+       JournalCommand message = (JournalCommand)type.createMessage();
+       message.mergeFramed(is);
+       
+       assertEquals(expected, message);
+    }
+    
+}

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaBulkLoadingTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaBulkLoadingTest.java?rev=731704&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaBulkLoadingTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaBulkLoadingTest.java
Mon Jan  5 12:48:38 2009
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.perf;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ProgressPrinter;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+
+/**
+ * This tests bulk loading and unloading of messages to a Queue.s
+ * 
+ * @version $Revision: 712224 $
+ */
+public class KahaBulkLoadingTest extends JmsTestSupport {
+
+    private static final Log LOG = LogFactory.getLog(KahaBulkLoadingTest.class);
+
+    protected int messageSize = 1024 * 4;
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        // kaha.deleteAllMessages();
+        broker.setPersistenceAdapter(kaha);
+        broker.addConnector("tcp://localhost:0");
+        return broker;
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException
{
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(((TransportConnector)broker.getTransportConnectors().get(0)).getServer().getConnectURI());
+        factory.setUseAsyncSend(true);
+        return factory;
+    }
+
+    public void testQueueSendThenAddConsumer() throws Exception {
+        long start;
+        long end;
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        connection.setUseCompression(false);
+        connection.getPrefetchPolicy().setAll(10);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+        LOG.info("Receiving messages that are in the queue");
+        MessageConsumer consumer = session.createConsumer(destination);
+        BytesMessage msg = (BytesMessage)consumer.receive(2000);
+        int consumed = 0;
+        if( msg!=null ) {
+            consumed++;
+        }
+        while (true) {
+            int counter = 0;
+            if (msg == null) {
+                break;
+            }
+            end = start = System.currentTimeMillis();
+            int size = 0;
+            while ((end - start) < 5000) {
+                msg = (BytesMessage)consumer.receive(5000);
+                if (msg == null) {
+                    break;
+                }
+                counter++;
+                consumed++;
+                end = System.currentTimeMillis();
+                size += msg.getBodyLength();
+            }
+            LOG.info("Consumed: " + (counter * 1000.0 / (end - start)) + " " + " messages/sec,
" + (1.0 * size / (1024.0 * 1024.0)) * ((1000.0 / (end - start))) + " megs/sec ");
+        }
+        consumer.close();
+        LOG.info("Consumed " + consumed + " messages from the queue.");
+
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        LOG.info("Sending messages that are " + (messageSize / 1024.0) + "k large");
+        // Send a message to the broker.
+        start = System.currentTimeMillis();
+
+        final AtomicBoolean stop = new AtomicBoolean();
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                stop.set(true);
+            }
+        });
+
+        int produced = 0;
+        while (!stop.get()) {
+            end = start = System.currentTimeMillis();
+            int produceCount = 0;
+            while ((end - start) < 5000 && !stop.get()) {
+                BytesMessage bm = session.createBytesMessage();
+                bm.writeBytes(new byte[messageSize]);
+                producer.send(bm);
+                produceCount++;
+                produced++;
+                end = System.currentTimeMillis();
+            }
+            LOG.info("Produced: " + (produceCount * 1000.0 / (end - start)) + " messages/sec,
" + (1.0 * produceCount * messageSize / (1024.0 * 1024.0)) * ((1000.0 / (end - start))) +
" megs/sec");
+        }
+        LOG.info("Prodcued " + produced + " messages to the queue.");
+
+    }
+
+    public static Test suite() {
+        return suite(KahaBulkLoadingTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreDurableTopicTest.java?rev=731704&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreDurableTopicTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreDurableTopicTest.java
Mon Jan  5 12:48:38 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.perf;
+
+import java.io.File;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.perf.SimpleDurableTopicTest;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+
+/**
+ * @version $Revision: 712224 $
+ */
+public class KahaStoreDurableTopicTest extends SimpleDurableTopicTest {
+
+    protected void configureBroker(BrokerService answer,String uri) throws Exception {
+        File dataFileDir = new File("target/test-amq-data/perfTest/amqdb");
+        dataFileDir.mkdirs();
+        // answer.setDeleteAllMessagesOnStartup(true);
+               
+         KahaDBStore adaptor = new KahaDBStore();
+         adaptor.setDirectory(dataFileDir);
+         
+        
+        answer.setDataDirectoryFile(dataFileDir);
+        answer.setPersistenceAdapter(adaptor);
+        answer.addConnector(uri);
+    }
+}

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreQueueTest.java?rev=731704&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreQueueTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreQueueTest.java
Mon Jan  5 12:48:38 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.perf;
+
+import java.io.File;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.perf.SimpleQueueTest;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+
+/**
+ * @version $Revision: 712224 $
+ */
+public class KahaStoreQueueTest extends SimpleQueueTest {
+
+    protected void configureBroker(BrokerService answer,String uri) throws Exception {
+        File dataFileDir = new File("target/test-amq-data/perfTest/amqdb");
+        dataFileDir.mkdirs();
+        answer.setDeleteAllMessagesOnStartup(true);
+               
+         KahaDBStore adaptor = new KahaDBStore();
+         adaptor.setDirectory(dataFileDir);
+         
+        
+        answer.setDataDirectoryFile(dataFileDir);
+        answer.setPersistenceAdapter(adaptor);
+        answer.addConnector(uri);
+    }
+
+}
+

Modified: activemq/trunk/kahadb/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/pom.xml?rev=731704&r1=731703&r2=731704&view=diff
==============================================================================
--- activemq/trunk/kahadb/pom.xml (original)
+++ activemq/trunk/kahadb/pom.xml Mon Jan  5 12:48:38 2009
@@ -52,78 +52,27 @@
         </exclusion>
       </exclusions>
     </dependency>
+    
     <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
       <version>1.2.14</version>
-      <scope>compile</scope>
+      <scope>test</scope>
       <optional>true</optional>
     </dependency>
     
     <dependency>
-      <groupId>org.apache.activemq.protobuf</groupId>
-      <artifactId>activemq-protobuf</artifactId>
-    </dependency>        
-    <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.xbean</groupId>
-      <artifactId>xbean-spring</artifactId>
-      <optional>true</optional>
-    </dependency>
-    <dependency>
-      <groupId>org.springframework</groupId>
-      <artifactId>spring-core</artifactId>
-      <optional>true</optional>
-    </dependency>
-    <dependency>
-      <groupId>org.springframework</groupId>
-      <artifactId>spring-beans</artifactId>
-      <optional>true</optional>
-    </dependency>
-    <dependency>
-      <groupId>org.springframework</groupId>
-      <artifactId>spring-context</artifactId>
-      <optional>true</optional>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop.zookeeper</groupId>
-      <artifactId>zookeeper</artifactId>
-      <optional>true</optional>
-    </dependency>
-        
-    <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-core</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activeblaze</artifactId>
-      <version>1.0-SNAPSHOT</version>
-      <optional>true</optional>
-    </dependency>
-     
+
   </dependencies>
-  
-  <repositories>
-    <repository>
-      <id>chirino-zk-repo</id>
-      <name>Private ZooKeeper Repo</name>
-      <url>http://people.apache.org/~chirino/zk-repo/</url>
-    </repository>
-  </repositories>
+
 
   <build>
     <plugins>
+      <!--
       <plugin>
         <groupId>org.apache.xbean</groupId>
         <artifactId>maven-xbean-plugin</artifactId>
@@ -139,6 +88,7 @@
           </execution>
         </executions>
       </plugin>
+      -->
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-compiler-plugin</artifactId>
@@ -148,17 +98,6 @@
         </configuration>
       </plugin>
       <plugin>
-        <groupId>org.apache.activemq.protobuf</groupId>
-        <artifactId>activemq-protobuf</artifactId>
-         <executions>
-          <execution>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
           <forkMode>pertest</forkMode>



Mime
View raw message