activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r697971 - /activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreRecoveryBrokerTest.java
Date Mon, 22 Sep 2008 20:17:59 GMT
Author: chirino
Date: Mon Sep 22 13:17:59 2008
New Revision: 697971

URL: http://svn.apache.org/viewvc?rev=697971&view=rev
Log:
adding a test to verify that the store works well even when loaded with lots of messages.

Modified:
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreRecoveryBrokerTest.java

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreRecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreRecoveryBrokerTest.java?rev=697971&r1=697970&r2=697971&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreRecoveryBrokerTest.java
(original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreRecoveryBrokerTest.java
Mon Sep 22 13:17:59 2008
@@ -18,12 +18,22 @@
 
 import java.io.File;
 import java.net.URI;
+import java.util.ArrayList;
 
 import junit.framework.Test;
 
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.RecoveryBrokerTest;
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
 
 
 /**
@@ -58,4 +68,77 @@
         junit.textui.TestRunner.run(suite());
     }
 
+    
+    public void testLargeQueuePersistentMessagesNotLostOnRestart() throws Exception {
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+        
+        ArrayList<String> expected = new ArrayList<String>();
+        
+        int MESSAGE_COUNT = 10000;
+        for(int i=0; i < MESSAGE_COUNT; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+            expected.add(message.getMessageId().toString());
+        }
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+        producerInfo = createProducerInfo(sessionInfo);
+        connection.send(producerInfo);
+
+        for(int i=0; i < MESSAGE_COUNT/2; i++) {
+            Message m = receiveMessage(connection);
+            assertNotNull("Should have received message "+expected.get(0)+" by now!", m);
+            assertEquals(expected.remove(0), m.getMessageId().toString());
+            MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
+            connection.send(ack);
+        }
+        
+        connection.request(closeConnectionInfo(connectionInfo));
+        
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        for(int i=0; i < MESSAGE_COUNT/2; i++) {
+            Message m = receiveMessage(connection);
+            assertNotNull("Should have received message "+expected.get(i)+" by now!", m);
+            assertEquals(expected.get(i), m.getMessageId().toString());
+            MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
+            connection.send(ack);
+            
+            
+        }
+        
+        connection.request(closeConnectionInfo(connectionInfo));
+    }
 }



Mime
View raw message