activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1061899 - in /activemq/trunk/activemq-core/src/test/java/org/apache/activemq: bugs/AMQ3120Test.java util/ConsumerThread.java util/ProducerThread.java
Date Fri, 21 Jan 2011 16:48:50 GMT
Author: dejanb
Date: Fri Jan 21 16:48:50 2011
New Revision: 1061899

URL: http://svn.apache.org/viewvc?rev=1061899&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3120 - test case in the making

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ConsumerThread.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ProducerThread.java

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java?rev=1061899&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java Fri
Jan 21 16:48:50 2011
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.ConsumerThread;
+import org.apache.activemq.util.ProducerThread;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+import javax.jms.*;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+
+public class AMQ3120Test {
+
+    private static final Log LOG = LogFactory.getLog(AMQ3120Test.class);
+
+    BrokerService broker = null;
+    File kahaDbDir = null;
+    private final Destination destination = new ActiveMQQueue("AMQ3120Test");
+    final String payload = new String(new byte[1024]);
+
+    protected void startBroker(boolean delete) throws Exception {
+        broker = new BrokerService();
+
+        //Start with a clean directory
+        kahaDbDir = new File(broker.getBrokerDataDirectory(), "KahaDB");
+        deleteDir(kahaDbDir);
+
+        broker.setSchedulerSupport(false);
+        broker.setDeleteAllMessagesOnStartup(delete);
+        broker.setPersistent(true);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://localhost:0");
+
+        PolicyMap map = new PolicyMap();
+        PolicyEntry entry = new PolicyEntry();
+        entry.setUseCache(false);
+        map.setDefaultEntry(entry);
+        broker.setDestinationPolicy(map);
+
+        configurePersistence(broker, delete);
+
+        broker.start();
+        LOG.info("Starting broker..");
+    }
+
+    protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart)
throws Exception {
+        KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
+
+        // ensure there are a bunch of data files but multiple entries in each
+        adapter.setJournalMaxFileLength(1024 * 20);
+
+        // speed up the test case, checkpoint an cleanup early and often
+        adapter.setCheckpointInterval(500);
+        adapter.setCleanupInterval(500);
+
+        if (!deleteAllOnStart) {
+            adapter.setForceRecoverIndex(true);
+        }
+
+    }
+
+    private boolean deleteDir(File dir) {
+        if (dir.isDirectory()) {
+            String[] children = dir.list();
+            for (int i = 0; i < children.length; i++) {
+                boolean success = deleteDir(new File(dir, children[i]));
+                if (!success) {
+                    return false;
+                }
+            }
+        }
+
+        return dir.delete();
+    }
+
+    private int getFileCount(File dir){
+        if (dir.isDirectory()) {
+            String[] children = dir.list();
+            return children.length;
+        }
+
+        return 0;
+    }
+
+    @Test
+    public void testCleanupOfFiles() throws Exception {
+
+        startBroker(false);
+        int fileCount = getFileCount(kahaDbDir);
+        assertEquals(4, fileCount);
+
+        Connection connection = new ActiveMQConnectionFactory(
+                broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
+        connection.start();
+        Session producerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session consumerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ProducerThread producer = new ProducerThread(producerSess, destination) {
+            @Override
+            protected Message createMessage(int i) throws Exception {
+                return sess.createTextMessage(payload + "::" + i);
+            }
+        };
+        producer.setSleep(1500);
+        ConsumerThread consumer = new ConsumerThread(consumerSess, destination);
+        consumer.setBreakOnNull(false);
+
+        producer.start();
+        consumer.start();
+
+        producer.join();
+        consumer.join();
+
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+    }
+
+}

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ConsumerThread.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ConsumerThread.java?rev=1061899&r1=1061898&r2=1061899&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ConsumerThread.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ConsumerThread.java
Fri Jan 21 16:48:50 2011
@@ -29,6 +29,7 @@ public class ConsumerThread extends Thre
     int received = 0;
     Destination dest;
     Session sess;
+    boolean breakOnNull = true;
 
     public ConsumerThread(Session sess, Destination dest) {
         this.dest = dest;
@@ -47,7 +48,9 @@ public class ConsumerThread extends Thre
                     LOG.info("Received " + ((TextMessage)msg).getText());
                     received++;
                 } else {
-                    break;
+                    if (breakOnNull) {
+                        break;
+                    }
                 }
             }
         } catch (JMSException e) {
@@ -66,4 +69,12 @@ public class ConsumerThread extends Thre
     public int getReceived() {
         return received;
     }
+
+    public void setMessageCount(int messageCount) {
+        this.messageCount = messageCount;
+    }
+
+    public void setBreakOnNull(boolean breakOnNull) {
+        this.breakOnNull = breakOnNull;
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ProducerThread.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ProducerThread.java?rev=1061899&r1=1061898&r2=1061899&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ProducerThread.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ProducerThread.java
Fri Jan 21 16:48:50 2011
@@ -19,10 +19,7 @@ package org.apache.activemq.util;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
+import javax.jms.*;
 
 public class ProducerThread extends Thread {
 
@@ -30,7 +27,7 @@ public class ProducerThread extends Thre
 
     int messageCount = 1000;
     Destination dest;
-    Session sess;
+    protected Session sess;
     int sleep = 0;
     int sentCount = 0;
 
@@ -44,7 +41,7 @@ public class ProducerThread extends Thre
         try {
             producer = sess.createProducer(dest);
             for (sentCount = 0; sentCount < messageCount; sentCount++) {
-                producer.send(sess.createTextMessage("test message: " + sentCount));
+                producer.send(createMessage(sentCount));
                 LOG.info("Sent 'test message: " + sentCount + "'");
                 if (sleep > 0) {
                     Thread.sleep(sleep);
@@ -63,6 +60,9 @@ public class ProducerThread extends Thre
         }
     }
 
+    protected Message createMessage(int i) throws Exception {
+        return sess.createTextMessage("test message: " + i);
+    }
 
     public void setMessageCount(int messageCount) {
         this.messageCount = messageCount;



Mime
View raw message