activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r667105 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/kaha/impl/async/ main/java/org/apache/activemq/store/amq/ main/java/org/apache/activemq/store/kahadaptor/ test/java/org/apache/acti...
Date Thu, 12 Jun 2008 14:31:28 GMT
Author: rajdavies
Date: Thu Jun 12 07:31:27 2008
New Revision: 667105

URL: http://svn.apache.org/viewvc?rev=667105&view=rev
Log:
Patch applied for https://issues.apache.org/activemq/browse/AMQ-1795

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
    activemq/trunk/activemq-core/src/test/resources/log4j.properties

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=667105&r1=667104&r2=667105&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Jun 12 07:31:27 2008
@@ -1226,7 +1226,11 @@
                     if (er.getException() instanceof JMSException) {
                         throw (JMSException)er.getException();
                     } else {
+                        try {
                         throw JMSExceptionSupport.create(er.getException());
+                        }catch(Throwable e) {
+                            LOG.error("Caught an exception trying to create a JMSException",e);
+                        }
                     }
                 }
                 return response;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=667105&r1=667104&r2=667105&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Thu Jun 12 07:31:27 2008
@@ -71,6 +71,7 @@
     public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
     public static final String DEFAULT_FILE_PREFIX = "data-";
     public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
+    public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
 
     private static final Log LOG = LogFactory.getLog(AsyncDataManager.class);
 
@@ -188,7 +189,7 @@
                 cleanup();
             }
         };
-        Scheduler.executePeriodically(cleanupTask, 1000 * 30);
+        Scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
     }
 
     public void lock() throws IOException {
@@ -272,6 +273,7 @@
             if (currentWriteFile != null) {
                 currentWriteFile.linkAfter(nextWriteFile);
                 if (currentWriteFile.isUnused()) {
+                    System.err.println("remove current file unused:" + currentWriteFile);
                     removeDataFile(currentWriteFile);
                 }
             }
@@ -298,7 +300,7 @@
         DataFile dataFile = fileMap.get(key);
         if (dataFile == null) {
             LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
-            throw new IOException("Could not locate data file " + filePrefix + "-" + item.getDataFileId());
+            throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId());
         }
         return dataFile;
     }
@@ -308,7 +310,7 @@
         DataFile dataFile = fileMap.get(key);
         if (dataFile == null) {
             LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
-            throw new IOException("Could not locate data file " + filePrefix + "-" + item.getDataFileId());
+            throw new IOException("Could not locate data file " + filePrefix  + item.getDataFileId());
         }
         return dataFile.getFile();
     }
@@ -411,7 +413,9 @@
             purgeList.add(dataFile);
         }
         for (DataFile dataFile : purgeList) {
-            forceRemoveDataFile(dataFile);
+            if (dataFile.getDataFileId() != currentWriteFile.getDataFileId()) {
+                forceRemoveDataFile(dataFile);
+            }
         }
     }
 
@@ -463,11 +467,11 @@
         dataFile.unlink();
         if (archiveDataLogs) {
             dataFile.move(getDirectoryArchive());
-            LOG.debug("moced data file " + dataFile + " to "
+            LOG.debug("moved data file " + dataFile + " to "
                     + getDirectoryArchive());
         } else {
             boolean result = dataFile.delete();
-            LOG.debug("discarding data file " + dataFile
+            LOG.info("discarding data file " + dataFile
                     + (result ? "successful " : "failed"));
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=667105&r1=667104&r2=667105&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Thu Jun 12 07:31:27 2008
@@ -113,6 +113,7 @@
             if (debug) {
                 LOG.debug("Journalled message add for: " + id + ", at: " + location);
             }
+            this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId());
             addMessage(message, location);
         } else {
             if (debug) {
@@ -164,7 +165,6 @@
          try {
             lastLocation = location;
             messages.put(message.getMessageId(), data);
-            this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId());
         }finally {
             lock.unlock();
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=667105&r1=667104&r2=667105&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Thu Jun 12 07:31:27 2008
@@ -90,13 +90,11 @@
     private static final boolean BROKEN_FILE_LOCK;
     private static final boolean DISABLE_LOCKING;
     private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
-
     private AsyncDataManager asyncDataManager;
     private ReferenceStoreAdapter referenceStoreAdapter;
     private TaskRunnerFactory taskRunnerFactory;
     private WireFormat wireFormat = new OpenWireFormat();
     private SystemUsage usageManager;
-    private long cleanupInterval = 1000 * 30;
     private long checkpointInterval = 1000 * 60;
     private int maxCheckpointMessageAddSize = 1024 * 4;
     private AMQTransactionStore transactionStore = new AMQTransactionStore(this);
@@ -116,6 +114,7 @@
     private boolean persistentIndex=true;
     private boolean useNio = true;
     private boolean archiveDataLogs=false;
+    private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL;
     private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
     private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
     private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
@@ -425,8 +424,12 @@
             }
             Integer lastDataFile = asyncDataManager.getCurrentDataFileId();   
             inProgress.add(lastDataFile);
-            Set<Integer> inUse = new HashSet<Integer>(referenceStoreAdapter.getReferenceFileIdsInUse());
-            asyncDataManager.consolidateDataFilesNotIn(inUse, inProgress);
+            inProgress.addAll(referenceStoreAdapter.getReferenceFileIdsInUse());
+            Location lastActiveTx = transactionStore.checkpoint();
+            if (lastActiveTx != null) {
+                lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId());
+            }
+            asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1);
         } catch (IOException e) {
             LOG.error("Could not cleanup data files: " + e, e);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java?rev=667105&r1=667104&r2=667105&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java Thu Jun 12 07:31:27 2008
@@ -44,6 +44,7 @@
     private boolean persistentIndex=true;
     private boolean useNio = true;
     private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
+    private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL;
 
 
     /**
@@ -60,9 +61,18 @@
         result.setReferenceStoreAdapter(getReferenceStoreAdapter());
         result.setUseNio(isUseNio());
         result.setMaxFileLength(getMaxFileLength());
+        result.setCleanupInterval(getCleanupInterval());
         return result;
     }
 
+    public long getCleanupInterval() {
+        return cleanupInterval;
+    }
+    
+    public void setCleanupInterval(long val) {
+        cleanupInterval = val;
+    }
+
     /**
      * @return the dataDirectory
      */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java?rev=667105&r1=667104&r2=667105&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java Thu Jun 12 07:31:27 2008
@@ -18,9 +18,12 @@
 package org.apache.activemq.store.amq;
 
 import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.activemq.command.JournalTopicAck;
 import org.apache.activemq.command.JournalTransaction;
@@ -230,13 +233,13 @@
         // But we keep track of the first location of an operation
         // that was associated with an active tx. The journal can not
         // roll over active tx records.
-        Location rc = null;
+        Location minimumLocationInUse = null;
         synchronized (inflightTransactions) {
             for (Iterator<AMQTx> iter = inflightTransactions.values().iterator(); iter.hasNext();) {
                 AMQTx tx = iter.next();
                 Location location = tx.getLocation();
-                if (rc == null || rc.compareTo(location) < 0) {
-                    rc = location;
+                if (minimumLocationInUse == null || location.compareTo(minimumLocationInUse) < 0) {
+                    minimumLocationInUse = location;
                 }
             }
         }
@@ -244,11 +247,11 @@
             for (Iterator<AMQTx> iter = preparedTransactions.values().iterator(); iter.hasNext();) {
                 AMQTx tx = iter.next();
                 Location location = tx.getLocation();
-                if (rc == null || rc.compareTo(location) < 0) {
-                    rc = location;
+                if (minimumLocationInUse == null || location.compareTo(minimumLocationInUse) < 0) {
+                    minimumLocationInUse = location;
                 }
             }
-            return rc;
+            return minimumLocationInUse;
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=667105&r1=667104&r2=667105&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Thu Jun 12 07:31:27 2008
@@ -234,8 +234,8 @@
      * @throws IOException
      * @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse()
      */
-    public Set<Integer> getReferenceFileIdsInUse() throws IOException {
-        return recordReferences.keySet();
+    public synchronized Set<Integer> getReferenceFileIdsInUse() throws IOException {
+        return new HashSet<Integer>(recordReferences.keySet());
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java?rev=667105&r1=667104&r2=667105&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java Thu Jun 12 07:31:27 2008
@@ -1,41 +1,41 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-
-public class MessageSender {
-    private MessageProducer producer;
-    private Session session;
-
-    public MessageSender(String queueName, Connection connection, boolean useTransactedSession) throws Exception {
-        session = useTransactedSession ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        producer = session.createProducer(session.createQueue(queueName));
-    }
-
-    public void send(String payload) throws Exception {
-        ObjectMessage message = session.createObjectMessage();
-        message.setObject(payload);
-        producer.send(message);
-        if (session.getTransacted()) {
-            session.commit();
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+public class MessageSender {
+    private MessageProducer producer;
+    private Session session;
+
+    public MessageSender(String queueName, Connection connection, boolean useTransactedSession, boolean topic) throws Exception {
+        session = useTransactedSession ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(topic ? session.createTopic(queueName) : session.createQueue(queueName));
+    }
+
+    public void send(String payload) throws Exception {
+        ObjectMessage message = session.createObjectMessage();
+        message.setObject(payload);
+        producer.send(message);
+        if (session.getTransacted()) {
+            session.commit();
+        }
+    }
+}

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java?rev=667105&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java Thu Jun 12 07:31:27 2008
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/*
+ * Try and replicate:
+ * Caused by: java.io.IOException: Could not locate data file data--188
+ *  at org.apache.activemq.kaha.impl.async.AsyncDataManager.getDataFile(AsyncDataManager.java:302)
+ *  at org.apache.activemq.kaha.impl.async.AsyncDataManager.read(AsyncDataManager.java:614)
+ *  at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:523)
+ */
+
+public class MissingDataFileTest extends TestCase {
+
+    private static final Log LOG = LogFactory.getLog(MissingDataFileTest.class);
+    
+    private static int counter = 300;
+
+    private static int hectorToHaloCtr;
+    private static int xenaToHaloCtr;
+    private static int troyToHaloCtr;
+
+    private static int haloToHectorCtr;
+    private static int haloToXenaCtr;
+    private static int haloToTroyCtr;
+
+    private String hectorToHalo = "hectorToHalo";
+    private String xenaToHalo = "xenaToHalo";
+    private String troyToHalo = "troyToHalo";
+
+    private String haloToHector = "haloToHector";
+    private String haloToXena = "haloToXena";
+    private String haloToTroy = "haloToTroy";
+
+
+    private BrokerService broker;
+
+    private Connection hectorConnection;
+    private Connection xenaConnection;
+    private Connection troyConnection;
+    private Connection haloConnection;
+
+    private final Object lock = new Object();
+    final boolean useTopic = false;
+    final boolean useSleep = true;
+    
+    protected static final String payload = new String(new byte[500]);
+
+    public Connection createConnection() throws JMSException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+        return factory.createConnection();
+    }
+
+    public Session createSession(Connection connection, boolean transacted) throws JMSException {
+        return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.addConnector("tcp://localhost:61616").setName("Default");
+   
+        SystemUsage systemUsage;
+        systemUsage = new SystemUsage();
+        systemUsage.getMemoryUsage().setLimit(1024 * 10); // Just a few messags 
+        broker.setSystemUsage(systemUsage);
+        
+        AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
+        factory.setMaxFileLength(2*1024); // ~4 messages
+        factory.setCleanupInterval(5000); // every few second
+        
+        broker.start();
+        LOG.info("Starting broker..");
+    }
+
+    public void tearDown() throws Exception {
+        hectorConnection.close();
+        xenaConnection.close();
+        troyConnection.close();
+        haloConnection.close();
+        broker.stop();
+    }
+
+    public void testForNoDataFoundError() throws Exception {
+        
+        startBroker();
+        hectorConnection = createConnection();
+        Thread hectorThread = buildProducer(hectorConnection, hectorToHalo, false, useTopic);
+        Receiver hHectorReceiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                haloToHectorCtr++;
+                if (haloToHectorCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+                possiblySleep(haloToHectorCtr);
+            }
+        };
+        buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver, useTopic);
+
+        troyConnection = createConnection();
+        Thread troyThread = buildProducer(troyConnection, troyToHalo);
+        Receiver hTroyReceiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                haloToTroyCtr++;
+                if (haloToTroyCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+                possiblySleep(haloToTroyCtr);
+            }
+        };
+        buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver, false);
+
+        xenaConnection = createConnection();
+        Thread xenaThread = buildProducer(xenaConnection, xenaToHalo);
+        Receiver hXenaReceiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                haloToXenaCtr++;
+                if (haloToXenaCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+                possiblySleep(haloToXenaCtr);
+            }
+        };
+        buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver, false);
+
+        haloConnection = createConnection();
+        final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection, false);
+        final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection, false);
+        final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection, false);
+        Receiver hectorReceiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                hectorToHaloCtr++;
+                troySender.send(payload);
+                if (hectorToHaloCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                    possiblySleep(hectorToHaloCtr);
+                }
+            }
+        };
+        Receiver xenaReceiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                xenaToHaloCtr++;
+                hectorSender.send(payload);
+                if (xenaToHaloCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+                possiblySleep(xenaToHaloCtr);
+            }
+        };
+        Receiver troyReceiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                troyToHaloCtr++;
+                xenaSender.send(payload);
+                if (troyToHaloCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+            }
+        };
+        buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver, false);
+        buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver, false);
+        buildReceiver(haloConnection, troyToHalo, true, troyReceiver, false);
+
+        haloConnection.start();
+
+        troyConnection.start();
+        troyThread.start();
+
+        xenaConnection.start();
+        xenaThread.start();
+
+        hectorConnection.start();
+        hectorThread.start();
+        waitForMessagesToBeDelivered();
+        // number of messages received should match messages sent
+        assertEquals(hectorToHaloCtr, counter);
+        LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages");
+        assertEquals(xenaToHaloCtr, counter);
+        LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages");
+        assertEquals(troyToHaloCtr, counter);
+        LOG.info("troyToHalo received " + troyToHaloCtr + " messages");
+        assertEquals(haloToHectorCtr, counter);
+        LOG.info("haloToHector received " + haloToHectorCtr + " messages");
+        assertEquals(haloToXenaCtr, counter);
+        LOG.info("haloToXena received " + haloToXenaCtr + " messages");
+        assertEquals(haloToTroyCtr, counter);
+        LOG.info("haloToTroy received " + haloToTroyCtr + " messages");
+
+    }
+
+    protected void possiblySleep(int count) throws InterruptedException {
+        if (useSleep) {
+            if (count % 100 == 0) {
+                Thread.sleep(5000);
+            }
+        }
+        
+    }
+
+    protected void waitForMessagesToBeDelivered() {
+        // let's give the listeners enough time to read all messages
+        long maxWaitTime = counter * 1000;
+        long waitTime = maxWaitTime;
+        long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
+
+        synchronized (lock) {
+            boolean hasMessages = true;
+            while (hasMessages && waitTime >= 0) {
+                try {
+                    lock.wait(200);
+                } catch (InterruptedException e) {
+                    LOG.error(e);
+                }
+                // check if all messages have been received
+                hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter
+                              || haloToTroyCtr < counter;
+                waitTime = maxWaitTime - (System.currentTimeMillis() - start);
+            }
+        }
+    }
+
+    public MessageSender buildTransactionalProducer(String queueName, Connection connection, boolean isTopic) throws Exception {
+
+        return new MessageSender(queueName, connection, true, isTopic);
+    }
+
+    public Thread buildProducer(Connection connection, final String queueName) throws Exception {
+        return buildProducer(connection, queueName, false, false);
+    }
+    
+    public Thread buildProducer(Connection connection, final String queueName, boolean transacted, boolean isTopic) throws Exception {
+        final MessageSender producer = new MessageSender(queueName, connection, transacted, isTopic);
+        Thread thread = new Thread() {
+            public synchronized void run() {
+                for (int i = 0; i < counter; i++) {
+                    try {
+                        producer.send(payload );
+                    } catch (Exception e) {
+                        throw new RuntimeException("on " + queueName + " send", e);
+                    }
+                }
+            }
+        };
+        return thread;
+    }
+
+    public void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver, boolean isTopic) throws Exception {
+        final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer inputMessageConsumer = session.createConsumer(isTopic ? session.createTopic(queueName) : session.createQueue(queueName));
+        MessageListener messageListener = new MessageListener() {
+
+            public void onMessage(Message message) {
+                try {
+                    ObjectMessage objectMessage = (ObjectMessage)message;
+                    String s = (String)objectMessage.getObject();
+                    receiver.receive(s);
+                    if (session.getTransacted()) {
+                        session.commit();
+                    }
+
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+        inputMessageConsumer.setMessageListener(messageListener);
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java?rev=667105&r1=667104&r2=667105&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java Thu Jun 12 07:31:27 2008
@@ -1,286 +1,286 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/*
- * simulate message flow which cause the following exception in the broker
- * (exception logged by client) <p/> 2007-07-24 13:51:23,624
- * com.easynet.halo.Halo ERROR (LoggingErrorHandler.java: 23) JMS failure
- * javax.jms.JMSException: Transaction 'TX:ID:dmt-53625-1185281414694-1:0:344'
- * has not been started. at
- * org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:230)
- * This appears to be consistent in a MacBook. Haven't been able to replicate it
- * on Windows though
- */
-public class TransactionNotStartedErrorTest extends TestCase {
-
-    private static final Log LOG = LogFactory.getLog(TransactionNotStartedErrorTest.class);
-    
-    private static int counter = 500;
-
-    private static int hectorToHaloCtr;
-    private static int xenaToHaloCtr;
-    private static int troyToHaloCtr;
-
-    private static int haloToHectorCtr;
-    private static int haloToXenaCtr;
-    private static int haloToTroyCtr;
-
-    private String hectorToHalo = "hectorToHalo";
-    private String xenaToHalo = "xenaToHalo";
-    private String troyToHalo = "troyToHalo";
-
-    private String haloToHector = "haloToHector";
-    private String haloToXena = "haloToXena";
-    private String haloToTroy = "haloToTroy";
-
-
-    private BrokerService broker;
-
-    private Connection hectorConnection;
-    private Connection xenaConnection;
-    private Connection troyConnection;
-    private Connection haloConnection;
-
-    private final Object lock = new Object();
-
-    public Connection createConnection() throws JMSException {
-        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
-        return factory.createConnection();
-    }
-
-    public Session createSession(Connection connection, boolean transacted) throws JMSException {
-        return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
-    }
-
-    public void startBroker() throws Exception {
-        broker = new BrokerService();
-        broker.setDeleteAllMessagesOnStartup(true);
-        broker.setPersistent(true);
-        broker.setUseJmx(true);
-        broker.addConnector("tcp://localhost:61616").setName("Default");
-        broker.start();
-        LOG.info("Starting broker..");
-    }
-
-    public void tearDown() throws Exception {
-        hectorConnection.close();
-        xenaConnection.close();
-        troyConnection.close();
-        haloConnection.close();
-        broker.stop();
-    }
-
-    public void testTransactionNotStartedError() throws Exception {
-        startBroker();
-        hectorConnection = createConnection();
-        Thread hectorThread = buildProducer(hectorConnection, hectorToHalo);
-        Receiver hHectorReceiver = new Receiver() {
-            public void receive(String s) throws Exception {
-                haloToHectorCtr++;
-                if (haloToHectorCtr >= counter) {
-                    synchronized (lock) {
-                        lock.notifyAll();
-                    }
-                }
-            }
-        };
-        buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver);
-
-        troyConnection = createConnection();
-        Thread troyThread = buildProducer(troyConnection, troyToHalo);
-        Receiver hTroyReceiver = new Receiver() {
-            public void receive(String s) throws Exception {
-                haloToTroyCtr++;
-                if (haloToTroyCtr >= counter) {
-                    synchronized (lock) {
-                        lock.notifyAll();
-                    }
-                }
-            }
-        };
-        buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver);
-
-        xenaConnection = createConnection();
-        Thread xenaThread = buildProducer(xenaConnection, xenaToHalo);
-        Receiver hXenaReceiver = new Receiver() {
-            public void receive(String s) throws Exception {
-                haloToXenaCtr++;
-                if (haloToXenaCtr >= counter) {
-                    synchronized (lock) {
-                        lock.notifyAll();
-                    }
-                }
-            }
-        };
-        buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver);
-
-        haloConnection = createConnection();
-        final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection);
-        final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection);
-        final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection);
-        Receiver hectorReceiver = new Receiver() {
-            public void receive(String s) throws Exception {
-                hectorToHaloCtr++;
-                troySender.send("halo to troy because of hector");
-                if (hectorToHaloCtr >= counter) {
-                    synchronized (lock) {
-                        lock.notifyAll();
-                    }
-                }
-            }
-        };
-        Receiver xenaReceiver = new Receiver() {
-            public void receive(String s) throws Exception {
-                xenaToHaloCtr++;
-                hectorSender.send("halo to hector because of xena");
-                if (xenaToHaloCtr >= counter) {
-                    synchronized (lock) {
-                        lock.notifyAll();
-                    }
-                }
-            }
-        };
-        Receiver troyReceiver = new Receiver() {
-            public void receive(String s) throws Exception {
-                troyToHaloCtr++;
-                xenaSender.send("halo to xena because of troy");
-                if (troyToHaloCtr >= counter) {
-                    synchronized (lock) {
-                        lock.notifyAll();
-                    }
-                }
-            }
-        };
-        buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver);
-        buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver);
-        buildReceiver(haloConnection, troyToHalo, true, troyReceiver);
-
-        haloConnection.start();
-
-        troyConnection.start();
-        troyThread.start();
-
-        xenaConnection.start();
-        xenaThread.start();
-
-        hectorConnection.start();
-        hectorThread.start();
-        waitForMessagesToBeDelivered();
-        // number of messages received should match messages sent
-        assertEquals(hectorToHaloCtr, counter);
-        LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages");
-        assertEquals(xenaToHaloCtr, counter);
-        LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages");
-        assertEquals(troyToHaloCtr, counter);
-        LOG.info("troyToHalo received " + troyToHaloCtr + " messages");
-        assertEquals(haloToHectorCtr, counter);
-        LOG.info("haloToHector received " + haloToHectorCtr + " messages");
-        assertEquals(haloToXenaCtr, counter);
-        LOG.info("haloToXena received " + haloToXenaCtr + " messages");
-        assertEquals(haloToTroyCtr, counter);
-        LOG.info("haloToTroy received " + haloToTroyCtr + " messages");
-
-    }
-
-    protected void waitForMessagesToBeDelivered() {
-        // let's give the listeners enough time to read all messages
-        long maxWaitTime = counter * 3000;
-        long waitTime = maxWaitTime;
-        long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
-
-        synchronized (lock) {
-            boolean hasMessages = true;
-            while (hasMessages && waitTime >= 0) {
-                try {
-                    lock.wait(200);
-                } catch (InterruptedException e) {
-                    LOG.error(e);
-                }
-                // check if all messages have been received
-                hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter
-                              || haloToTroyCtr < counter;
-                waitTime = maxWaitTime - (System.currentTimeMillis() - start);
-            }
-        }
-    }
-
-    public MessageSender buildTransactionalProducer(String queueName, Connection connection) throws Exception {
-
-        return new MessageSender(queueName, connection, true);
-    }
-
-    public Thread buildProducer(Connection connection, final String queueName) throws Exception {
-
-        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        final MessageSender producer = new MessageSender(queueName, connection, false);
-        Thread thread = new Thread() {
-
-            public synchronized void run() {
-                for (int i = 0; i < counter; i++) {
-                    try {
-                        producer.send(queueName);
-                        if (session.getTransacted()) {
-                            session.commit();
-                        }
-
-                    } catch (Exception e) {
-                        throw new RuntimeException("on " + queueName + " send", e);
-                    }
-                }
-            }
-        };
-        return thread;
-    }
-
-    public void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver) throws Exception {
-        final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer inputMessageConsumer = session.createConsumer(session.createQueue(queueName));
-        MessageListener messageListener = new MessageListener() {
-
-            public void onMessage(Message message) {
-                try {
-                    ObjectMessage objectMessage = (ObjectMessage)message;
-                    String s = (String)objectMessage.getObject();
-                    receiver.receive(s);
-                    if (session.getTransacted()) {
-                        session.commit();
-                    }
-
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        };
-        inputMessageConsumer.setMessageListener(messageListener);
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/*
+ * simulate message flow which cause the following exception in the broker
+ * (exception logged by client) <p/> 2007-07-24 13:51:23,624
+ * com.easynet.halo.Halo ERROR (LoggingErrorHandler.java: 23) JMS failure
+ * javax.jms.JMSException: Transaction 'TX:ID:dmt-53625-1185281414694-1:0:344'
+ * has not been started. at
+ * org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:230)
+ * This appears to be consistent in a MacBook. Haven't been able to replicate it
+ * on Windows though
+ */
+public class TransactionNotStartedErrorTest extends TestCase {
+
+    private static final Log LOG = LogFactory.getLog(TransactionNotStartedErrorTest.class);
+    
+    private static int counter = 500;
+
+    private static int hectorToHaloCtr;
+    private static int xenaToHaloCtr;
+    private static int troyToHaloCtr;
+
+    private static int haloToHectorCtr;
+    private static int haloToXenaCtr;
+    private static int haloToTroyCtr;
+
+    private String hectorToHalo = "hectorToHalo";
+    private String xenaToHalo = "xenaToHalo";
+    private String troyToHalo = "troyToHalo";
+
+    private String haloToHector = "haloToHector";
+    private String haloToXena = "haloToXena";
+    private String haloToTroy = "haloToTroy";
+
+
+    private BrokerService broker;
+
+    private Connection hectorConnection;
+    private Connection xenaConnection;
+    private Connection troyConnection;
+    private Connection haloConnection;
+
+    private final Object lock = new Object();
+
+    public Connection createConnection() throws JMSException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+        return factory.createConnection();
+    }
+
+    public Session createSession(Connection connection, boolean transacted) throws JMSException {
+        return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.addConnector("tcp://localhost:61616").setName("Default");
+        broker.start();
+        LOG.info("Starting broker..");
+    }
+
+    public void tearDown() throws Exception {
+        hectorConnection.close();
+        xenaConnection.close();
+        troyConnection.close();
+        haloConnection.close();
+        broker.stop();
+    }
+
+    public void testTransactionNotStartedError() throws Exception {
+        startBroker();
+        hectorConnection = createConnection();
+        Thread hectorThread = buildProducer(hectorConnection, hectorToHalo);
+        Receiver hHectorReceiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                haloToHectorCtr++;
+                if (haloToHectorCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+            }
+        };
+        buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver);
+
+        troyConnection = createConnection();
+        Thread troyThread = buildProducer(troyConnection, troyToHalo);
+        Receiver hTroyReceiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                haloToTroyCtr++;
+                if (haloToTroyCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+            }
+        };
+        buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver);
+
+        xenaConnection = createConnection();
+        Thread xenaThread = buildProducer(xenaConnection, xenaToHalo);
+        Receiver hXenaReceiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                haloToXenaCtr++;
+                if (haloToXenaCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+            }
+        };
+        buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver);
+
+        haloConnection = createConnection();
+        final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection);
+        final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection);
+        final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection);
+        Receiver hectorReceiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                hectorToHaloCtr++;
+                troySender.send("halo to troy because of hector");
+                if (hectorToHaloCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+            }
+        };
+        Receiver xenaReceiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                xenaToHaloCtr++;
+                hectorSender.send("halo to hector because of xena");
+                if (xenaToHaloCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+            }
+        };
+        Receiver troyReceiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                troyToHaloCtr++;
+                xenaSender.send("halo to xena because of troy");
+                if (troyToHaloCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+            }
+        };
+        buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver);
+        buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver);
+        buildReceiver(haloConnection, troyToHalo, true, troyReceiver);
+
+        haloConnection.start();
+
+        troyConnection.start();
+        troyThread.start();
+
+        xenaConnection.start();
+        xenaThread.start();
+
+        hectorConnection.start();
+        hectorThread.start();
+        waitForMessagesToBeDelivered();
+        // number of messages received should match messages sent
+        assertEquals(hectorToHaloCtr, counter);
+        LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages");
+        assertEquals(xenaToHaloCtr, counter);
+        LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages");
+        assertEquals(troyToHaloCtr, counter);
+        LOG.info("troyToHalo received " + troyToHaloCtr + " messages");
+        assertEquals(haloToHectorCtr, counter);
+        LOG.info("haloToHector received " + haloToHectorCtr + " messages");
+        assertEquals(haloToXenaCtr, counter);
+        LOG.info("haloToXena received " + haloToXenaCtr + " messages");
+        assertEquals(haloToTroyCtr, counter);
+        LOG.info("haloToTroy received " + haloToTroyCtr + " messages");
+
+    }
+
+    protected void waitForMessagesToBeDelivered() {
+        // let's give the listeners enough time to read all messages
+        long maxWaitTime = counter * 3000;
+        long waitTime = maxWaitTime;
+        long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
+
+        synchronized (lock) {
+            boolean hasMessages = true;
+            while (hasMessages && waitTime >= 0) {
+                try {
+                    lock.wait(200);
+                } catch (InterruptedException e) {
+                    LOG.error(e);
+                }
+                // check if all messages have been received
+                hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter
+                              || haloToTroyCtr < counter;
+                waitTime = maxWaitTime - (System.currentTimeMillis() - start);
+            }
+        }
+    }
+
+    public MessageSender buildTransactionalProducer(String queueName, Connection connection) throws Exception {
+
+        return new MessageSender(queueName, connection, true, false);
+    }
+
+    public Thread buildProducer(Connection connection, final String queueName) throws Exception {
+
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final MessageSender producer = new MessageSender(queueName, connection, false, false);
+        Thread thread = new Thread() {
+
+            public synchronized void run() {
+                for (int i = 0; i < counter; i++) {
+                    try {
+                        producer.send(queueName);
+                        if (session.getTransacted()) {
+                            session.commit();
+                        }
+
+                    } catch (Exception e) {
+                        throw new RuntimeException("on " + queueName + " send", e);
+                    }
+                }
+            }
+        };
+        return thread;
+    }
+
+    public void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver) throws Exception {
+        final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer inputMessageConsumer = session.createConsumer(session.createQueue(queueName));
+        MessageListener messageListener = new MessageListener() {
+
+            public void onMessage(Message message) {
+                try {
+                    ObjectMessage objectMessage = (ObjectMessage)message;
+                    String s = (String)objectMessage.getObject();
+                    receiver.receive(s);
+                    if (session.getTransacted()) {
+                        session.commit();
+                    }
+
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+        inputMessageConsumer.setMessageListener(messageListener);
+    }
+
+}

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java?rev=667105&r1=667104&r2=667105&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java Thu Jun 12 07:31:27 2008
@@ -40,7 +40,6 @@
     protected void setUp() throws Exception {
         numberofProducers=1;
         numberOfConsumers=1;
-        this.consumerSleepDuration=0;
         super.setUp();
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java?rev=667105&r1=667104&r2=667105&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java Thu Jun 12 07:31:27 2008
@@ -20,6 +20,9 @@
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
 
 /**
  * @version $Revision: 1.3 $
@@ -28,12 +31,22 @@
     
     protected void setUp() throws Exception {
         numberOfDestinations=1;
-        numberOfConsumers = 1;
+        numberOfConsumers = 4;
         numberofProducers = 1;
         sampleCount=1000;
         playloadSize = 1024;
         super.setUp();
     }
+    
+    protected void configureBroker(BrokerService answer,String uri) throws Exception {
+        AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+        persistenceFactory.setMaxFileLength(1024*16);
+        answer.setPersistenceFactory(persistenceFactory);
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.addConnector(uri);
+        answer.setUseShutdownHook(false);
+    }
+    
     protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte payload[]) throws JMSException {
         PerfProducer pp = new PerfProducer(fac, dest, payload);
         pp.setDeliveryMode(DeliveryMode.PERSISTENT);
@@ -42,7 +55,13 @@
 
     protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
         PerfConsumer result = new PerfConsumer(fac, dest, "subs:" + number);
-        result.setInitialDelay(20000);
+        result.setInitialDelay(2000);
+        return result;
+    }
+    
+    protected ActiveMQConnectionFactory createConnectionFactory(String uri) throws Exception {
+        ActiveMQConnectionFactory result = super.createConnectionFactory(uri);
+        result.setSendAcksAsync(false);
         return result;
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java?rev=667105&r1=667104&r2=667105&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java Thu Jun 12 07:31:27 2008
@@ -65,7 +65,6 @@
             LOG.info("Testing against destination: " + destination);
             for (int i = 0; i < numberOfConsumers; i++) {
                 consumers[i] = createConsumer(consumerFactory, destination, i);
-                consumers[i].setSleepDuration(consumerSleepDuration);
                 consumers[i].start();
             }
             for (int i = 0; i < numberofProducers; i++) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java?rev=667105&r1=667104&r2=667105&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java Thu Jun 12 07:31:27 2008
@@ -18,18 +18,13 @@
 
 import java.util.ArrayList;
 import java.util.List;
-
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
-
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
-import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
 
 /**
  * @version $Revision: 1.3 $
@@ -37,18 +32,24 @@
 public class SimpleNonPersistentQueueTest extends SimpleQueueTest {
 
     protected void setUp() throws Exception {
-        numberOfConsumers = 10;
-        numberofProducers = 10;
-        //this.consumerSleepDuration=100;
+        numberOfConsumers = 1;
+        numberofProducers = 1;
         super.setUp();
     }
     protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte[] payload) throws JMSException {
         PerfProducer pp = new PerfProducer(fac, dest, payload);
         pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-        //pp.setTimeToLive(100);
+        pp.setTimeToLive(100);
         return pp;
     }
     
+    protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
+        PerfConsumer result =  new PerfConsumer(fac, dest);
+        result.setInitialDelay(20*1000);
+        return result;
+    }
+    
+    /*
     protected void configureBroker(BrokerService answer,String uri) throws Exception {
         answer.setPersistent(false);
         final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
@@ -65,4 +66,5 @@
         answer.setDestinationPolicy(policyMap);
         super.configureBroker(answer, uri);
     }
+    */
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java?rev=667105&r1=667104&r2=667105&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java Thu Jun 12 07:31:27 2008
@@ -55,9 +55,7 @@
     protected byte[] array;
     protected ConnectionFactory factory;
     
-    protected long consumerSleepDuration=0;
-
-    /**
+     /**
      * Sets up a test where the producer and consumer have their own connection.
      * 
      * @see junit.framework.TestCase#setUp()
@@ -84,7 +82,6 @@
             LOG.info("Testing against destination: " + destination);
             for (int i = 0; i < numberOfConsumers; i++) {
                 consumers[consumerCount] = createConsumer(factory, destination, consumerCount);
-                consumers[consumerCount].setSleepDuration(consumerSleepDuration);
                 consumerCount++;
             }
             for (int i = 0; i < numberofProducers; i++) {

Modified: activemq/trunk/activemq-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/log4j.properties?rev=667105&r1=667104&r2=667105&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/log4j.properties (original)
+++ activemq/trunk/activemq-core/src/test/resources/log4j.properties Thu Jun 12 07:31:27 2008
@@ -18,7 +18,7 @@
 #
 # The logging properties used during tests..
 #
-log4j.rootLogger=INFO, out
+log4j.rootLogger=INFO, out, stdout
 
 log4j.logger.org.apache.activemq.spring=WARN
 



Mime
View raw message