activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r633639 [6/7] - in /activemq/sandbox/activemq-router: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/activemq/ src/main/java/org/apache/activemq/broker/ src/main/java/org/apache/active...
Date Tue, 04 Mar 2008 21:01:57 GMT
Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/package.html?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/package.html (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/package.html Tue Mar  4 13:01:41 2008
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Helpful utility classes used by the rest of the router package.
+
+</body>
+</html>

Added: activemq/sandbox/activemq-router/src/main/resources/META-INF/persistence.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/resources/META-INF/persistence.xml?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/resources/META-INF/persistence.xml (added)
+++ activemq/sandbox/activemq-router/src/main/resources/META-INF/persistence.xml Tue Mar  4 13:01:41 2008
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright 2006 The Apache Software Foundation.
+ 
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+ 
+ http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<persistence xmlns="http://java.sun.com/xml/ns/persistence"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    version="1.0">
+    <persistence-unit name="activemq" transaction-type="RESOURCE_LOCAL">
+        <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+        <class>org.apache.activemq.broker.router.index.jpa.model.IndexRecord</class>
+        <class>org.apache.activemq.broker.router.index.jpa.model.ReferenceRecord</class>
+        <class>org.apache.activemq.broker.router.index.jpa.model.StoreMetadata</class>
+    </persistence-unit>
+</persistence>

Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/api/StubDestination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/api/StubDestination.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/api/StubDestination.java (added)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/api/StubDestination.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.api;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.broker.router.api.ClientSubscription;
+import org.apache.activemq.broker.router.api.Destination;
+import org.apache.activemq.broker.router.api.RequestContext;
+import org.apache.activemq.broker.router.core.Router;
+import org.apache.activemq.broker.router.store.api.CacheEntry;
+import org.apache.activemq.broker.router.store.api.DataStore;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.usage.SystemUsage;
+
+public class StubDestination implements Destination {
+    public void addSubscription(ClientSubscription subscription) throws Exception {
+    }
+
+    public void dequeue(RequestContext context, MessageAck ack, DataStore dataStore, long storeId) throws Exception {
+    }
+
+    public void enqueue(RequestContext requestContext, Message message, Runnable onStored) throws Exception {
+    }
+
+    public AtomicInteger getConsumerCounter() {
+        return null;
+    }
+
+    public AtomicInteger getEnqueueCounter() {
+        return null;
+    }
+
+    public ActiveMQDestination getName() {
+        return null;
+    }
+
+    public AtomicInteger getProducerCounter() {
+        return null;
+    }
+
+    public Router getRouter() {
+        return null;
+    }
+
+    public SystemUsage getSystemUsage() {
+        return null;
+    }
+
+    public boolean lockForDispatch(ClientSubscription source, CacheEntry ref) {
+        return false;
+    }
+
+    public void removeSubscription(ClientSubscription subscription) throws Exception {
+    }
+
+    public void start() throws Exception {
+    }
+
+    public void stop() throws Exception {
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexTestSupport.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexTestSupport.java (added)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexTestSupport.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.index;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.router.index.api.DataIndex;
+import org.apache.activemq.broker.router.index.api.DataIndexManager;
+import org.apache.activemq.broker.router.index.api.IndexEntry;
+import org.apache.activemq.broker.router.index.api.ReferenceIndex;
+import org.apache.activemq.kaha.impl.async.Location;
+
+abstract public class IndexTestSupport extends TestCase {
+
+    DataIndexManager dataIndexManager;
+
+    @Override
+    protected void setUp() throws Exception {
+        dataIndexManager = createDataIndexManager();
+        dataIndexManager.start();
+        List<DataIndex> stores = dataIndexManager.getStores();
+        for (DataIndex dataIndex : stores) {
+            dataIndexManager.removeStore(dataIndex);
+        }
+    }
+
+    abstract protected DataIndexManager createDataIndexManager() throws Exception;
+
+    protected void restartDataIndexManager() throws Exception {
+        if (dataIndexManager != null) {
+            dataIndexManager.stop();
+        }
+        dataIndexManager = createDataIndexManager();
+        dataIndexManager.start();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        dataIndexManager.stop();
+        dataIndexManager = null;
+    }
+
+    public void testCreateDestroyDataIndex() throws Exception {
+        String indexName = getName();
+
+        assertNull(dataIndexManager.getStore(indexName));
+        List<DataIndex> indexes = dataIndexManager.getStores();
+        assertTrue(indexes.isEmpty());
+
+        DataIndex dataIndex = dataIndexManager.addStore(indexName);
+        assertNotNull(dataIndex);
+
+        assertSame(dataIndex, dataIndexManager.getStore(indexName));
+        indexes = dataIndexManager.getStores();
+        assertEquals(1, indexes.size());
+        assertTrue(indexes.contains(dataIndex));
+
+        // Verify that the data index create was persisted between restart.
+        restartDataIndexManager();
+
+        dataIndex = dataIndexManager.getStore(indexName);
+        assertNotNull(dataIndex);
+        assertEquals(indexName, dataIndex.getName());
+        indexes = dataIndexManager.getStores();
+        assertEquals(1, indexes.size());
+        assertTrue(indexes.contains(dataIndex));
+
+        // Verify that the data index remove was persisted between restart.
+        dataIndexManager.removeStore(dataIndex);
+        restartDataIndexManager();
+
+        dataIndex = dataIndexManager.getStore(indexName);
+        assertNull(dataIndex);
+        indexes = dataIndexManager.getStores();
+        assertTrue(indexes.isEmpty());
+    }
+
+    public void testCreateDestroyReferenceIndex() throws Exception {
+
+        String dataName = getName();
+        String refName = getName() + "-ref";
+        DataIndex manager = dataIndexManager.addStore(dataName);
+
+        assertNull(manager.getStore(refName));
+        List<ReferenceIndex> indexes = manager.getStores();
+        assertTrue(indexes.isEmpty());
+
+        ReferenceIndex dataIndex = manager.addStore(refName);
+        assertNotNull(dataIndex);
+
+        assertSame(dataIndex, manager.getStore(refName));
+        indexes = manager.getStores();
+        assertEquals(1, indexes.size());
+        assertTrue(indexes.contains(dataIndex));
+
+        // Verify that the data index create was persisted between restart.
+        restartDataIndexManager();
+        manager = dataIndexManager.getStore(dataName);
+
+        dataIndex = manager.getStore(refName);
+        assertNotNull(dataIndex);
+        assertEquals(refName, dataIndex.getName());
+        indexes = manager.getStores();
+        assertEquals(1, indexes.size());
+        assertTrue(indexes.contains(dataIndex));
+
+        // Verify that the data index remove was persisted between restart.
+        manager.removeStore(dataIndex);
+        restartDataIndexManager();
+        manager = dataIndexManager.getStore(dataName);
+
+        dataIndex = manager.getStore(refName);
+        assertNull(dataIndex);
+        indexes = manager.getStores();
+        assertTrue(indexes.isEmpty());
+
+    }
+
+    /**
+     * Verify that index properties can be stored without any issues.
+     * 
+     * @throws Exception
+     */
+    public void testIndexProperties() throws Exception {
+
+        String indexName = getName();
+
+        // Make a relatively large property object..
+        Map<String, String> properties = new HashMap<String, String>();
+        for (int i = 0; i < 1000; i++) {
+            properties.put("key" + i, "value" + i);
+        }
+
+        DataIndex dataIndex = dataIndexManager.addStore(indexName);
+        dataIndex.setProperties(properties);
+        assertEquals(properties, dataIndex.getProperties());
+
+        // Restart and verify the the properties were preserved.
+        restartDataIndexManager();
+
+        dataIndex = dataIndexManager.getStore(indexName);
+        assertEquals(properties, dataIndex.getProperties());
+
+        dataIndex.setProperties(null);
+
+        String refName = indexName + "-ref";
+        ReferenceIndex refIndex = dataIndex.addStore(refName);
+        refIndex.setProperties(properties);
+
+        restartDataIndexManager();
+
+        dataIndex = dataIndexManager.getStore(indexName);
+        assertNull(dataIndex.getProperties());
+        refIndex = dataIndex.getStore(refName);
+
+        assertEquals(properties, refIndex.getProperties());
+
+    }
+
+    /**
+     * Verify that the create retrieve and delete operations work properly
+     * against a DataIndex.
+     * 
+     * @throws Exception
+     */
+    public void testDataIndexCRD() throws Exception {
+        String indexName = getName();
+        DataIndex dataIndex = dataIndexManager.addStore(indexName);
+
+        int count = 10;
+        IndexEntry ie[] = new IndexEntry[count];
+        for (int i = 0; i < count; i++) {
+            ie[i] = dataIndex.addMessage(new Long(i), createLocation(0, i));
+            assertNotNull(ie[i]);
+            assertNotNull(ie[i].getIndex());
+            assertNotNull(ie[i].getId());
+            assertNotNull(ie[i].getLocation());
+        }
+
+        restartDataIndexManager();
+        dataIndex = dataIndexManager.getStore(indexName);
+
+        assertEquals(count, dataIndex.size());
+
+        List<IndexEntry> loaded = dataIndex.load(null, null, count * 2);
+        assertEquals(count, loaded.size());
+
+        // Verify that all the entries were loaded in the order inserted.
+        int i = 0;
+        for (IndexEntry entry : loaded) {
+            assertNotNull(entry);
+            assertNotNull(entry.getIndex());
+            assertEquals(ie[i], entry);
+            i++;
+        }
+
+        // Lets delete every other record.
+        int deleteCount = 0;
+        i = 0;
+        for (IndexEntry entry : loaded) {
+            if (i % 2 == 0) {
+                dataIndex.remove(entry.getId());
+                deleteCount++;
+            }
+            i++;
+        }
+
+        // Restart and verify that the right records were removed.
+        restartDataIndexManager();
+        dataIndex = dataIndexManager.getStore(indexName);
+
+        assertEquals(count - deleteCount, dataIndex.size());
+
+        loaded = dataIndex.load(null, null, count * 2);
+        assertEquals(count - deleteCount, loaded.size());
+
+        Iterator<IndexEntry> iterator = loaded.iterator();
+        for (int j = 0; j < count; j++) {
+            if (!(i % 2 == 0)) {
+                IndexEntry entry = iterator.next();
+                assertEquals(ie[j], entry);
+            }
+        }
+
+        // Verify that removing a store wipes out his data.
+        dataIndexManager.removeStore(dataIndex);
+        dataIndex = dataIndexManager.addStore(indexName);
+        assertEquals(0, dataIndex.size());
+
+    }
+
+    protected void assertEquals(IndexEntry expected, IndexEntry actual) {
+        assertEquals(expected.getId(), actual.getId());
+        assertEquals(expected.getLocation(), actual.getLocation());
+    }
+
+    protected void assertEquals(Location expected, Location actual) {
+        assertEquals(expected.getOffset(), actual.getOffset());
+        assertEquals(expected.getDataFileId(), actual.getDataFileId());
+    }
+
+    private Location createLocation(int fileId, int offset) {
+        Location rc = new Location();
+        rc.setDataFileId(fileId);
+        rc.setOffset(offset);
+        return rc;
+    }
+
+    /**
+     * Verify that the create retrieve and delete operations work properly
+     * against a ReferenceIndex.
+     * 
+     * @throws Exception
+     */
+    public void testReferenceIndexCRD() throws Exception {
+        String indexName = getName();
+        String refName = getName() + "-ref";
+        DataIndex dataIndex = dataIndexManager.addStore(indexName);
+        ReferenceIndex refIndex = dataIndex.addStore(refName);
+
+        int count = 10;
+        IndexEntry ie[] = new IndexEntry[count];
+        for (int i = 0; i < count; i++) {
+            ie[i] = dataIndex.addMessage(new Long(i), createLocation(0, i));
+            assertNotNull(ie[i]);
+            assertNotNull(ie[i].getIndex());
+            assertNotNull(ie[i].getId());
+            assertNotNull(ie[i].getLocation());
+        }
+
+        for (int i = 0; i < count; i++) {
+            refIndex.addReference(ie[i]);
+        }
+
+        restartDataIndexManager();
+
+        dataIndex = dataIndexManager.getStore(indexName);
+        refIndex = dataIndex.getStore(refName);
+
+        assertEquals(count, refIndex.size());
+
+        List<IndexEntry> loaded = refIndex.load(null, null, count * 2);
+        assertEquals(count, loaded.size());
+
+        // Verify that all the entries were loaded in the order inserted.
+        int i = 0;
+        for (IndexEntry entry : loaded) {
+            assertNotNull(entry);
+            assertNotNull(entry.getIndex());
+            assertEquals(ie[i], entry);
+            i++;
+        }
+
+        // Lets delete every other record.
+        int deleteCount = 0;
+        i = 0;
+        for (IndexEntry entry : loaded) {
+            if (i % 2 == 0) {
+                refIndex.remove(entry.getId());
+                deleteCount++;
+            }
+            i++;
+        }
+
+        // Restart and verify that the right records were removed.
+        restartDataIndexManager();
+        dataIndex = dataIndexManager.getStore(indexName);
+        refIndex = dataIndex.getStore(refName);
+
+        assertEquals(count - deleteCount, refIndex.size());
+
+        loaded = refIndex.load(null, null, count * 2);
+        assertEquals(count - deleteCount, loaded.size());
+
+        Iterator<IndexEntry> iterator = loaded.iterator();
+        for (int j = 0; j < count; j++) {
+            if (!(i % 2 == 0)) {
+                IndexEntry entry = iterator.next();
+                assertEquals(ie[j], entry);
+            }
+        }
+
+        // Verify that removing a store wipes out his data.
+        dataIndex.removeStore(refIndex);
+        refIndex = dataIndex.addStore(refName);
+        assertEquals(0, refIndex.size());
+
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaIndexTest.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaIndexTest.java (added)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaIndexTest.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.index.jpa;
+
+import java.io.File;
+
+import org.apache.activemq.broker.router.index.IndexTestSupport;
+import org.apache.activemq.broker.router.index.api.DataIndexManager;
+import org.apache.activemq.broker.router.store.journal.JournalDataStoreManagerFactory;
+
+public class JpaIndexTest extends IndexTestSupport {
+
+    @Override
+    protected DataIndexManager createDataIndexManager() throws Exception {
+        JournalDataStoreManagerFactory factory = new JournalDataStoreManagerFactory();
+        factory.setDataDirectory(new File("target/data/" + getName()));
+        //factory.getEntityManagerProperties().put("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+        return factory.createDataIndexManager();
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/OriginalQueueTests.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/OriginalQueueTests.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/OriginalQueueTests.java (added)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/OriginalQueueTests.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.perf;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class OriginalQueueTests extends JmsTestSupport {
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setPersistent(true);
+        brokerService.setUseJmx(false);
+        brokerService.setAdvisorySupport(false);
+        brokerService.addConnector("tcp://localhost:0");
+        return brokerService;
+    }
+
+    private static final transient Log LOG = LogFactory.getLog(OriginalQueueTests.class);
+
+    private static final long SAMPLE_DELAY = Integer.parseInt(System.getProperty("SAMPLE_DELAY", "" + 1000 * 5));
+    private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "10000"));
+    private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000 * 1));
+    private static final int PRODUCER_COUNT = Integer.parseInt(System.getProperty("PRODUCER_COUNT", "10"));
+    private static final int CONSUMER_COUNT = Integer.parseInt(System.getProperty("CONSUMER_COUNT", "10"));
+    private static final int persistent = DeliveryMode.PERSISTENT;
+
+    public ActiveMQDestination destination;
+
+    public static Test suite() {
+        return suite(OriginalQueueTests.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(OriginalQueueTests.class);
+    }
+
+    public void initCombos() {
+        addCombinationValues("destination", new Object[] { new ActiveMQQueue("TEST") });
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException {
+        ActiveMQConnectionFactory rc = new ActiveMQConnectionFactory(((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI());
+        rc.setWatchTopicAdvisories(false);
+        rc.setUseAsyncSend(true);
+        return rc;
+    }
+
+    /**
+     * @throws Throwable
+     */
+    public void testConcurrentSendReceive() throws Throwable {
+
+        final Semaphore connectionsEstablished = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
+        final Semaphore workerDone = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
+        final CountDownLatch sampleTimeDone = new CountDownLatch(1);
+
+        final AtomicInteger producedMessages = new AtomicInteger(0);
+        final AtomicInteger receivedMessages = new AtomicInteger(0);
+
+        final Callable<Object> producer = new Callable<Object>() {
+            public Object call() throws JMSException, InterruptedException {
+                Connection connection = factory.createConnection();
+                connections.add(connection);
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageProducer producer = session.createProducer(destination);
+                producer.setDeliveryMode(persistent);
+                BytesMessage message = session.createBytesMessage();
+                message.writeBytes(new byte[1024]);
+                connection.start();
+                connectionsEstablished.release();
+
+                while (!sampleTimeDone.await(0, TimeUnit.MILLISECONDS)) {
+                    producer.send(message);
+                    producedMessages.incrementAndGet();
+                }
+
+                connection.close();
+                workerDone.release();
+                return null;
+            }
+        };
+
+        final Callable<Object> consumer = new Callable<Object>() {
+            public Object call() throws JMSException, InterruptedException {
+                Connection connection = factory.createConnection();
+                connections.add(connection);
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer = session.createConsumer(destination);
+
+                final HashMap<String, Long> lastMessageIds = new HashMap<String, Long>();
+
+                consumer.setMessageListener(new MessageListener() {
+                    public void onMessage(Message msg) {
+                        ActiveMQMessage m = (ActiveMQMessage) msg;
+                        String pid = m.getMessageId().getProducerId().toString();
+                        long current = m.getMessageId().getProducerSequenceId();
+                        Long last = lastMessageIds.get(pid);
+                        if (last != null) {
+                            if (current <= last) {
+                                System.out.println("Got out of order message from producer: " + pid + " got " + current + ", expected a message larger than " + last);
+                            }
+                        }
+                        lastMessageIds.put(pid, current);
+                        receivedMessages.incrementAndGet();
+                    }
+                });
+                connection.start();
+
+                connectionsEstablished.release();
+                sampleTimeDone.await();
+
+                connection.close();
+                workerDone.release();
+                return null;
+            }
+        };
+
+        final Throwable workerError[] = new Throwable[1];
+        for (int i = 0; i < PRODUCER_COUNT; i++) {
+            new Thread("Producer:" + i) {
+                public void run() {
+                    try {
+                        producer.call();
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                        workerError[0] = e;
+                    }
+                }
+            }.start();
+        }
+
+        // Thread.sleep(5000);
+
+        for (int i = 0; i < CONSUMER_COUNT; i++) {
+            new Thread("Consumer:" + i) {
+                public void run() {
+                    try {
+                        consumer.call();
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                        workerError[0] = e;
+                    }
+                }
+            }.start();
+        }
+
+        LOG.info(getName() + ": Waiting for Producers and Consumers to startup.");
+        connectionsEstablished.acquire();
+        LOG.info("Producers and Consumers are now running.  Waiting for system to reach steady state: " + (SAMPLE_DELAY / 1000.0f) + " seconds");
+        // Thread.sleep(1000 * 10);
+
+        LOG.info("Starting sample: " + SAMPLES + " each lasting " + (SAMPLE_DURATION / 1000.0f) + " seconds");
+
+        for (int i = 0; i < SAMPLES; i++) {
+
+            long start = System.currentTimeMillis();
+            producedMessages.set(0);
+            receivedMessages.set(0);
+
+            Thread.sleep(SAMPLE_DURATION);
+
+            long end = System.currentTimeMillis();
+            int r = receivedMessages.get();
+            int p = producedMessages.get();
+
+            LOG.info("published: " + p + " msgs at " + (p * 1000f / (end - start)) + " msgs/sec, " + "consumed: " + r + " msgs at " + (r * 1000f / (end - start)) + " msgs/sec");
+        }
+
+        LOG.info("Sample done.");
+        sampleTimeDone.countDown();
+
+        workerDone.acquire();
+        if (workerError[0] != null) {
+            throw workerError[0];
+        }
+
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/OriginalTopicTests.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/OriginalTopicTests.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/OriginalTopicTests.java (added)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/OriginalTopicTests.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.perf;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class OriginalTopicTests extends JmsTestSupport {
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setPersistent(true);
+        brokerService.setUseJmx(false);
+        brokerService.setAdvisorySupport(false);
+        brokerService.addConnector("tcp://localhost:0");
+        return brokerService;
+    }
+
+    private static final transient Log LOG = LogFactory.getLog(OriginalTopicTests.class);
+
+    private static final long SAMPLE_DELAY = Integer.parseInt(System.getProperty("SAMPLE_DELAY", "" + 1000 * 1));
+    private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "100000"));
+    private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000 * 1));
+    private static final int PRODUCER_COUNT = Integer.parseInt(System.getProperty("PRODUCER_COUNT", "1"));
+    private static final int CONSUMER_COUNT = Integer.parseInt(System.getProperty("CONSUMER_COUNT", "10"));
+    private static final int persistent = DeliveryMode.NON_PERSISTENT;
+
+    public ActiveMQDestination destination;
+
+    public static Test suite() {
+        return suite(OriginalTopicTests.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(OriginalTopicTests.class);
+    }
+
+    public void initCombos() {
+        addCombinationValues("destination", new Object[] { new ActiveMQTopic("TEST") });
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException {
+        ActiveMQConnectionFactory rc = new ActiveMQConnectionFactory(((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI());
+        rc.setWatchTopicAdvisories(false);
+        return rc;
+    }
+
+    /**
+     * @throws Throwable
+     */
+    public void testConcurrentSendReceive() throws Throwable {
+
+        final Semaphore connectionsEstablished = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
+        final Semaphore workerDone = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
+        final CountDownLatch sampleTimeDone = new CountDownLatch(1);
+
+        final AtomicInteger producedMessages = new AtomicInteger(0);
+        final AtomicInteger receivedMessages = new AtomicInteger(0);
+
+        final Callable<Object> producer = new Callable<Object>() {
+
+            public Object call() throws JMSException, InterruptedException {
+                Connection connection = factory.createConnection();
+                connections.add(connection);
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageProducer producer = session.createProducer(destination);
+                producer.setDeliveryMode(persistent);
+                BytesMessage message = session.createBytesMessage();
+                message.writeBytes(new byte[1024]);
+                connection.start();
+                connectionsEstablished.release();
+
+                while (!sampleTimeDone.await(0, TimeUnit.MILLISECONDS)) {
+                    producer.send(message);
+                    producedMessages.incrementAndGet();
+                }
+
+                connection.close();
+                workerDone.release();
+                return null;
+            }
+        };
+
+        final Callable<Object> consumer = new Callable<Object>() {
+            public Object call() throws JMSException, InterruptedException {
+                Connection connection = factory.createConnection();
+                connections.add(connection);
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer = session.createConsumer(destination);
+
+                consumer.setMessageListener(new MessageListener() {
+                    public void onMessage(Message msg) {
+                        receivedMessages.incrementAndGet();
+                    }
+                });
+                connection.start();
+
+                connectionsEstablished.release();
+                sampleTimeDone.await();
+
+                connection.close();
+                workerDone.release();
+                return null;
+            }
+        };
+
+        final Throwable workerError[] = new Throwable[1];
+        for (int i = 0; i < PRODUCER_COUNT; i++) {
+            new Thread("Producer:" + i) {
+                public void run() {
+                    try {
+                        producer.call();
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                        workerError[0] = e;
+                    }
+                }
+            }.start();
+        }
+
+        // Thread.sleep(5000);
+
+        for (int i = 0; i < CONSUMER_COUNT; i++) {
+            new Thread("Consumer:" + i) {
+                public void run() {
+                    try {
+                        consumer.call();
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                        workerError[0] = e;
+                    }
+                }
+            }.start();
+        }
+
+        LOG.info(getName() + ": Waiting for Producers and Consumers to startup.");
+        connectionsEstablished.acquire();
+        LOG.info("Producers and Consumers are now running.  Waiting for system to reach steady state: " + (SAMPLE_DELAY / 1000.0f) + " seconds");
+        // Thread.sleep(1000 * 10);
+
+        LOG.info("Starting sample: " + SAMPLES + " each lasting " + (SAMPLE_DURATION / 1000.0f) + " seconds");
+
+        for (int i = 0; i < SAMPLES; i++) {
+
+            long start = System.currentTimeMillis();
+            producedMessages.set(0);
+            receivedMessages.set(0);
+
+            Thread.sleep(SAMPLE_DURATION);
+
+            long end = System.currentTimeMillis();
+            int r = receivedMessages.get();
+            int p = producedMessages.get();
+
+            LOG.info("published: " + p + " msgs at " + (p * 1000f / (end - start)) + " msgs/sec, " + "consumed: " + r + " msgs at " + (r * 1000f / (end - start)) + " msgs/sec");
+        }
+
+        LOG.info("Sample done.");
+        sampleTimeDone.countDown();
+
+        workerDone.acquire();
+        if (workerError[0] != null) {
+            throw workerError[0];
+        }
+
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/QueueTests.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/QueueTests.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/QueueTests.java (added)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/QueueTests.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.perf;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.RouterBrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.router.api.Destination;
+import org.apache.activemq.broker.router.util.DuplicateAndMissedChecker;
+import org.apache.activemq.broker.router.util.OrderChecker;
+import org.apache.activemq.broker.router.util.Sequence;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class QueueTests extends JmsTestSupport {
+
+    private RouterBrokerService brokerService;
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        brokerService = new RouterBrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+        brokerService.setAdvisorySupport(false);
+        brokerService.addConnector("tcp://localhost:0");
+        return brokerService;
+    }
+
+    private static final transient Log LOG = LogFactory.getLog(QueueTests.class);
+
+    private static final long SAMPLE_DELAY = Integer.parseInt(System.getProperty("SAMPLE_DELAY", "" + 1000 * 5));
+    private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "10000"));
+    private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000 * 1));
+    private static final int PRODUCER_COUNT = Integer.parseInt(System.getProperty("PRODUCER_COUNT", "1"));
+    private static final int CONSUMER_COUNT = Integer.parseInt(System.getProperty("CONSUMER_COUNT", "1"));
+    private static final int persistent = DeliveryMode.NON_PERSISTENT;
+
+    public ActiveMQDestination destination;
+
+    public static Test suite() {
+        return suite(QueueTests.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(QueueTests.class);
+    }
+
+    public void initCombos() {
+        addCombinationValues("destination", new Object[] { new ActiveMQQueue("TEST") });
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException {
+        ActiveMQConnectionFactory rc = new ActiveMQConnectionFactory(((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI());
+        rc.setWatchTopicAdvisories(false);
+        rc.setUseAsyncSend(true);
+        return rc;
+    }
+
+    DuplicateAndMissedChecker duplicateAndMissedChecker = new DuplicateAndMissedChecker();
+
+    /**
+     * @throws Throwable
+     */
+    public void testConcurrentSendReceive() throws Throwable {
+
+        final Semaphore connectionsEstablished = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
+        final Semaphore workerDone = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
+        final CountDownLatch sampleTimeDone = new CountDownLatch(1);
+
+        final AtomicInteger producedMessages = new AtomicInteger(0);
+        final AtomicInteger receivedMessages = new AtomicInteger(0);
+
+        final Callable<Object> producer = new Callable<Object>() {
+            public Object call() throws JMSException, InterruptedException {
+                Connection connection = factory.createConnection();
+                connections.add(connection);
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageProducer producer = session.createProducer(destination);
+                producer.setDeliveryMode(persistent);
+                BytesMessage message = session.createBytesMessage();
+                message.writeBytes(new byte[1024]);
+                connection.start();
+                connectionsEstablished.release();
+
+                while (!sampleTimeDone.await(0, TimeUnit.MILLISECONDS)) {
+                    producer.send(message);
+                    producedMessages.incrementAndGet();
+                }
+
+                connection.close();
+                workerDone.release();
+                return null;
+            }
+        };
+
+        final Callable<Object> consumer = new Callable<Object>() {
+            public Object call() throws JMSException, InterruptedException {
+                Connection connection = factory.createConnection();
+                connections.add(connection);
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer = session.createConsumer(destination);
+
+                final OrderChecker checker = new OrderChecker(duplicateAndMissedChecker);
+
+                consumer.setMessageListener(new MessageListener() {
+                    public void onMessage(Message msg) {
+                        ActiveMQMessage m = (ActiveMQMessage) msg;
+                        checker.onMessageId(m.getMessageId());
+                        receivedMessages.incrementAndGet();
+                    }
+                });
+                connection.start();
+
+                connectionsEstablished.release();
+                sampleTimeDone.await();
+
+                connection.close();
+                workerDone.release();
+                return null;
+            }
+        };
+
+        final Throwable workerError[] = new Throwable[1];
+        for (int i = 0; i < PRODUCER_COUNT; i++) {
+            new Thread("Producer:" + i) {
+                public void run() {
+                    try {
+                        producer.call();
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                        workerError[0] = e;
+                    }
+                }
+            }.start();
+        }
+
+        // Thread.sleep(5000);
+
+        for (int i = 0; i < CONSUMER_COUNT; i++) {
+            new Thread("Consumer:" + i) {
+                public void run() {
+                    try {
+                        consumer.call();
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                        workerError[0] = e;
+                    }
+                }
+            }.start();
+        }
+
+        LOG.info(getName() + ": Waiting for Producers and Consumers to startup.");
+        connectionsEstablished.acquire();
+        LOG.info("Producers and Consumers are now running.  Waiting for system to reach steady state: " + (SAMPLE_DELAY / 1000.0f) + " seconds");
+        // Thread.sleep(1000 * 10);
+
+        LOG.info("Starting sample: " + SAMPLES + " each lasting " + (SAMPLE_DURATION / 1000.0f) + " seconds");
+
+        Set<Destination> destinations = brokerService.getRegionBroker().getRouter().getDestinationManager().getDestinations(destination);
+        Destination destination = destinations.iterator().next();
+
+        for (int i = 0; i < SAMPLES; i++) {
+
+            long start = System.currentTimeMillis();
+            producedMessages.set(0);
+            receivedMessages.set(0);
+
+            Thread.sleep(SAMPLE_DURATION);
+
+            long end = System.currentTimeMillis();
+            int r = receivedMessages.get();
+            int p = producedMessages.get();
+
+            int pu = destination.getSystemUsage().getMemoryUsage().getPercentUsage();
+            LOG.info("published: " + p + " msgs at " + (p * 1000f / (end - start)) + " msgs/sec, " + "consumed: " + r + " msgs at " + (r * 1000f / (end - start)) + " msgs/sec, mem % used: " + pu);
+
+            // Map<String, List<Sequence>> missingMessages =
+            // duplicateAndMissedChecker.findMissingMessages();
+            // for (Map.Entry<String, List<Sequence>> entry:
+            // missingMessages.entrySet()) {
+            // LOG.info(" Missing Messages: "+entry.getKey()+":
+            // "+entry.getValue());
+            // }
+
+            Map<String, List<Sequence>> received = duplicateAndMissedChecker.getReceivedMessages();
+            for (Map.Entry<String, List<Sequence>> entry : received.entrySet()) {
+                LOG.info(" Received Messages: " + entry.getKey() + ": " + entry.getValue());
+            }
+        }
+
+        LOG.info("Sample done.");
+        sampleTimeDone.countDown();
+
+        workerDone.acquire();
+        if (workerError[0] != null) {
+            throw workerError[0];
+        }
+
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/TopicTests.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/TopicTests.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/TopicTests.java (added)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/TopicTests.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.perf;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.RouterBrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.router.api.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TopicTests extends JmsTestSupport {
+
+    private RouterBrokerService brokerService;
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        brokerService = new RouterBrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+        brokerService.setAdvisorySupport(false);
+        brokerService.addConnector("tcp://localhost:0");
+        return brokerService;
+    }
+
+    private static final transient Log LOG = LogFactory.getLog(TopicTests.class);
+
+    private static final long SAMPLE_DELAY = Integer.parseInt(System.getProperty("SAMPLE_DELAY", "" + 1000 * 1));
+    private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "100000"));
+    private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000 * 1));
+    private static final int PRODUCER_COUNT = Integer.parseInt(System.getProperty("PRODUCER_COUNT", "1"));
+    private static final int CONSUMER_COUNT = Integer.parseInt(System.getProperty("CONSUMER_COUNT", "10"));
+    private static final int persistent = DeliveryMode.NON_PERSISTENT;
+
+    public ActiveMQDestination destination;
+
+    public static Test suite() {
+        return suite(TopicTests.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(TopicTests.class);
+    }
+
+    public void initCombos() {
+        addCombinationValues("destination", new Object[] { new ActiveMQTopic("TEST") });
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException {
+        ActiveMQConnectionFactory rc = new ActiveMQConnectionFactory(((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI());
+        rc.setWatchTopicAdvisories(false);
+        return rc;
+    }
+
+    /**
+     * @throws Throwable
+     */
+    public void testConcurrentSendReceive() throws Throwable {
+
+        final Semaphore connectionsEstablished = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
+        final Semaphore workerDone = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
+        final CountDownLatch sampleTimeDone = new CountDownLatch(1);
+
+        final AtomicInteger producedMessages = new AtomicInteger(0);
+        final AtomicInteger receivedMessages = new AtomicInteger(0);
+
+        final Callable<Object> producer = new Callable<Object>() {
+
+            public Object call() throws JMSException, InterruptedException {
+                Connection connection = factory.createConnection();
+                connections.add(connection);
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageProducer producer = session.createProducer(destination);
+                producer.setDeliveryMode(persistent);
+                BytesMessage message = session.createBytesMessage();
+                message.writeBytes(new byte[1024]);
+                connection.start();
+                connectionsEstablished.release();
+
+                while (!sampleTimeDone.await(0, TimeUnit.MILLISECONDS)) {
+                    producer.send(message);
+                    producedMessages.incrementAndGet();
+                }
+
+                connection.close();
+                workerDone.release();
+                return null;
+            }
+        };
+
+        final Callable<Object> consumer = new Callable<Object>() {
+            public Object call() throws JMSException, InterruptedException {
+                Connection connection = factory.createConnection();
+                connections.add(connection);
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer = session.createConsumer(destination);
+
+                consumer.setMessageListener(new MessageListener() {
+                    public void onMessage(Message msg) {
+                        receivedMessages.incrementAndGet();
+                    }
+                });
+                connection.start();
+
+                connectionsEstablished.release();
+                sampleTimeDone.await();
+
+                connection.close();
+                workerDone.release();
+                return null;
+            }
+        };
+
+        final Throwable workerError[] = new Throwable[1];
+        for (int i = 0; i < PRODUCER_COUNT; i++) {
+            new Thread("Producer:" + i) {
+                public void run() {
+                    try {
+                        producer.call();
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                        workerError[0] = e;
+                    }
+                }
+            }.start();
+        }
+
+        // Thread.sleep(5000);
+
+        for (int i = 0; i < CONSUMER_COUNT; i++) {
+            new Thread("Consumer:" + i) {
+                public void run() {
+                    try {
+                        consumer.call();
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                        workerError[0] = e;
+                    }
+                }
+            }.start();
+        }
+
+        LOG.info(getName() + ": Waiting for Producers and Consumers to startup.");
+        connectionsEstablished.acquire();
+        LOG.info("Producers and Consumers are now running.  Waiting for system to reach steady state: " + (SAMPLE_DELAY / 1000.0f) + " seconds");
+        // Thread.sleep(1000 * 10);
+
+        LOG.info("Starting sample: " + SAMPLES + " each lasting " + (SAMPLE_DURATION / 1000.0f) + " seconds");
+
+        Set<Destination> destinations = brokerService.getRegionBroker().getRouter().getDestinationManager().getDestinations(destination);
+        Destination destination = destinations.iterator().next();
+
+        for (int i = 0; i < SAMPLES; i++) {
+
+            long start = System.currentTimeMillis();
+            producedMessages.set(0);
+            receivedMessages.set(0);
+
+            Thread.sleep(SAMPLE_DURATION);
+
+            long end = System.currentTimeMillis();
+            int r = receivedMessages.get();
+            int p = producedMessages.get();
+
+            int pu = destination.getSystemUsage().getMemoryUsage().getPercentUsage();
+            LOG.info("published: " + p + " msgs at " + (p * 1000f / (end - start)) + " msgs/sec, " + "consumed: " + r + " msgs at " + (r * 1000f / (end - start)) + " msgs/sec, mem % used: " + pu);
+        }
+
+        LOG.info("Sample done.");
+        sampleTimeDone.countDown();
+
+        workerDone.acquire();
+        if (workerError[0] != null) {
+            throw workerError[0];
+        }
+
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/queue/TheoryTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/queue/TheoryTest.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/queue/TheoryTest.java (added)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/queue/TheoryTest.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.queue;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.router.util.Selectable;
+import org.apache.activemq.broker.router.util.Selector;
+import org.apache.activemq.broker.router.util.SelectorThreadPool;
+import org.apache.activemq.broker.router.util.Usage;
+
+public class TheoryTest extends TestCase {
+
+    static private final Selector<Runnable> prefetchSelector = new Selector<Runnable>();
+
+    static int CLIENT_PREFETCH = 1000;
+
+    static class QueueConsumer<T> {
+        QueueProducer<T> destSub;
+        final ArrayBlockingQueue<T> subPrefetch = new ArrayBlockingQueue<T>(CLIENT_PREFETCH);
+
+        public boolean offer(T o) throws InterruptedException {
+            return subPrefetch.offer(o);
+        }
+
+        public void offer(List<T> list) {
+            for (Iterator<T> iterator = list.iterator(); iterator.hasNext();) {
+                T o = iterator.next();
+                if (subPrefetch.offer(o)) {
+                    iterator.remove();
+                } else {
+                    break;
+                }
+            }
+        }
+
+        public T poll() throws InterruptedException {
+            T rc = subPrefetch.poll(100, TimeUnit.MILLISECONDS);
+            if (rc != null && subPrefetch.size() < 100) {
+                destSub.wakeup();
+            }
+            return rc;
+        }
+
+        public QueueProducer<T> getDestSub() {
+            return destSub;
+        }
+
+        public void setDestSub(QueueProducer<T> destSub) {
+            this.destSub = destSub;
+        }
+
+        @Override
+        public String toString() {
+            return "Client Sub " + subPrefetch.size() + "/" + CLIENT_PREFETCH;
+        }
+    }
+
+    static class Store<T> {
+
+        LinkedList<T> items = new LinkedList<T>();
+
+        synchronized void add(T o) {
+            items.addLast(o);
+        }
+
+        public void load(List<T> prefetch, int max) throws InterruptedException {
+            // Thread.sleep(30);
+            synchronized (this) {
+                if (max < 1) {
+                    return;
+                }
+                Iterator<T> iterator = items.iterator();
+                for (int i = 0; i < max && iterator.hasNext(); i++) {
+                    prefetch.add(iterator.next());
+                    iterator.remove();
+                }
+            }
+        }
+
+        public String toString() {
+            return "Store " + items.size();
+        }
+    }
+
+    static class QueueProducer<T> {
+        final Store<T> store = new Store<T>();
+        final LinkedList<T> destPrefetch = new LinkedList<T>();
+        final QueueConsumer<T> next;
+        final AtomicLong enqueueCounter = new AtomicLong();
+
+        final ReadWriteLock rwl = new ReentrantReadWriteLock();
+        boolean enqueueToStore;
+
+        final AtomicLong pumpingCounter = new AtomicLong();
+
+        protected Selectable<Runnable> storePumpTask = prefetchSelector.create(new Runnable() {
+            public void run() {
+                pumpingCounter.incrementAndGet();
+                try {
+                    pump();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        public QueueProducer(QueueConsumer<T> next) {
+            this.next = next;
+        }
+
+        public boolean offer(T o) throws InterruptedException {
+            enqueueCounter.incrementAndGet();
+            rwl.readLock().lock();
+            try {
+                if (enqueueToStore) {
+                    store.add(o);
+                } else {
+                    if (!next.offer(o)) {
+                        rwl.readLock().unlock();
+                        rwl.writeLock().lock();
+                        try {
+                            if (!enqueueToStore) {
+                                // System.out.println("Next is full.. spooling
+                                // to store: "+enqueueCounter.get());
+                                enqueueToStore = true;
+                                storePumpTask.setEnabled(true);
+                            }
+                        } finally {
+                            rwl.readLock().lock();
+                            rwl.writeLock().unlock();
+                        }
+                        store.add(o);
+                        wakeup();
+                    }
+                }
+            } finally {
+                rwl.readLock().unlock();
+            }
+            return true;
+        }
+
+        AtomicInteger t = new AtomicInteger();
+
+        /**
+         * 
+         * @return true while the store is not empty.
+         * @throws InterruptedException
+         */
+        public boolean pump() throws InterruptedException {
+            t.incrementAndGet();
+            try {
+                for (;;) {
+                    if (destPrefetch.isEmpty()) {
+                        store.load(destPrefetch, MAX_PREFETCH - destPrefetch.size());
+                    }
+                    if (destPrefetch.isEmpty()) {
+                        // Looks like the store is empty..
+                        // try to enqueue directly to the next guy.
+                        rwl.writeLock().lock();
+                        try {
+                            store.load(destPrefetch, MAX_PREFETCH - destPrefetch.size());
+                            if (destPrefetch.isEmpty()) {
+                                // System.out.println("store is empty.. will
+                                // enqueue directly: "+enqueueCounter.get());
+                                enqueueToStore = false;
+                                return false;
+                            }
+                        } finally {
+                            rwl.writeLock().unlock();
+                        }
+                    }
+
+                    // Try to offer those prefetch messages to the
+                    // next guy. If he takes any he will remove those messages.
+                    int size = destPrefetch.size();
+                    next.offer(destPrefetch);
+                    if (destPrefetch.size() == size) {
+                        return true;
+                    }
+                }
+            } finally {
+                if (t.get() > 1) {
+                    System.out.println("Crap!");
+                }
+                t.decrementAndGet();
+            }
+        }
+
+        final AtomicLong wakupCounter = new AtomicLong();
+
+        public void wakeup() {
+            rwl.readLock().lock();
+            try {
+                if (enqueueToStore) {
+                    wakupCounter.incrementAndGet();
+                    storePumpTask.setEnabled(true);
+                }
+            } finally {
+                rwl.readLock().unlock();
+            }
+        }
+
+        private int MAX_PREFETCH = 1000;
+
+        public String toString() {
+            return "DestinationSubscription enqueueToStore:" + this.enqueueToStore + ", " + destPrefetch.size() + "/" + MAX_PREFETCH + ", " + store + ", " + next;
+        }
+
+    }
+
+    private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000 * 1));
+
+    public void testTheory() throws InterruptedException {
+        int producers = 10;
+        int consumers = 10;
+
+        SelectorThreadPool threadPool = new SelectorThreadPool("ActiveMQ ", 10, 50);
+        threadPool.add(prefetchSelector);
+        threadPool.start();
+
+        final AtomicInteger producedMessages = new AtomicInteger(0);
+        final AtomicInteger receivedMessages = new AtomicInteger(0);
+        final Usage usage = new Usage();
+        usage.setLimit(1000);
+        usage.start();
+
+        final ArrayList<QueueProducer<Long>> subscriptions = new ArrayList<QueueProducer<Long>>();
+        for (int i = 0; i < consumers; i++) {
+            final QueueConsumer<Long> cs = new QueueConsumer<Long>();
+            QueueProducer<Long> sub = new QueueProducer<Long>(cs);
+            cs.setDestSub(sub);
+            subscriptions.add(sub);
+            new Thread("Consumer: " + i) {
+                public void run() {
+                    try {
+                        while (true) {
+                            Long rc = cs.poll();
+                            if (rc != null) {
+                                receivedMessages.incrementAndGet();
+                                usage.decreaseUsage(1);
+                            }
+                            // Thread.sleep(10);
+                        }
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                    }
+                }
+            }.start();
+        }
+
+        for (int i = 0; i < producers; i++) {
+            new Thread("Producer: " + i) {
+                public void run() {
+                    try {
+                        for (long l = 0; true; l++) {
+                            for (QueueProducer<Long> sub : subscriptions) {
+                                while (!usage.waitForSpace(100)) {
+                                }
+                                if (sub.offer(l)) {
+                                    usage.increaseUsage(1);
+                                    producedMessages.incrementAndGet();
+                                }
+                            }
+                        }
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                    }
+                }
+            }.start();
+        }
+
+        for (int i = 0; true; i++) {
+
+            long start = System.currentTimeMillis();
+            producedMessages.set(0);
+            receivedMessages.set(0);
+
+            Thread.sleep(SAMPLE_DURATION);
+
+            long end = System.currentTimeMillis();
+            int r = receivedMessages.get();
+            int p = producedMessages.get();
+
+            System.out.println("published: " + p + " msgs at " + (p * 1000f / (end - start)) + " msgs/sec, " + "consumed: " + r + " msgs at " + (r * 1000f / (end - start)) + " msgs/sec");
+        }
+
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreTestSupport.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreTestSupport.java (added)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreTestSupport.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.router.api.Destination;
+import org.apache.activemq.broker.router.api.StubDestination;
+import org.apache.activemq.broker.router.store.api.CacheEntry;
+import org.apache.activemq.broker.router.store.api.DataStore;
+import org.apache.activemq.broker.router.store.api.DataStoreManager;
+import org.apache.activemq.broker.router.store.api.ReferenceStore;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.usage.SystemUsage;
+
+abstract public class StoreTestSupport extends TestCase {
+
+    DataStoreManager dataStoreManager;
+    private ProducerId producerId;
+    private int msgIdGenerator;
+    private ActiveMQQueue destinationName;
+
+    private Destination destination = new StubDestination() {
+        SystemUsage systemUsage = new SystemUsage();
+
+        @Override
+        public SystemUsage getSystemUsage() {
+            return systemUsage;
+        }
+    };
+
+    @Override
+    protected void setUp() throws Exception {
+        dataStoreManager = createDataStoreManager();
+        dataStoreManager.start();
+        List<DataStore> stores = dataStoreManager.getStores();
+        for (DataStore dataStore : stores) {
+            dataStoreManager.removeStore(dataStore);
+        }
+        producerId = new ProducerId(new SessionId(new ConnectionId("ID:test-host:1"), 1), 1);
+        destinationName = new ActiveMQQueue("test");
+    }
+
+    abstract protected DataStoreManager createDataStoreManager() throws Exception;
+
+    protected void restartDataStoreManager() throws Exception {
+        if (dataStoreManager != null) {
+            dataStoreManager.stop();
+        }
+        dataStoreManager = createDataStoreManager();
+        dataStoreManager.start();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        dataStoreManager.stop();
+        dataStoreManager = null;
+    }
+
+    /**
+     * Verify that the create retrieve and delete operations work properly
+     * against a DataStore.
+     * 
+     * @throws Exception
+     */
+    public void testDataStoreCRD() throws Exception {
+        String storeName = getName();
+        DataStore dataStore = dataStoreManager.addStore(storeName);
+        dataStore.setDestination(destination);
+
+        int count = 10;
+        CacheEntry ce[] = new CacheEntry[count];
+        for (int i = 0; i < count; i++) {
+            ce[i] = dataStore.addMessage(new Long(i), createTextMessage("Hiram's test: " + i), null);
+            assertNotNull(ce[i]);
+            assertNotNull(ce[i].getStore());
+            assertNotNull(ce[i].getId());
+            assertNotNull(ce[i].getMessage());
+        }
+
+        restartDataStoreManager();
+        dataStore = dataStoreManager.getStore(storeName);
+        dataStore.setDestination(destination);
+
+        assertEquals(count, dataStore.size());
+
+        List<CacheEntry> loaded = dataStore.load(null, null, count * 2);
+        assertEquals(count, loaded.size());
+
+        // Verify that all the entries were loaded in the order inserted.
+        int i = 0;
+        for (CacheEntry entry : loaded) {
+            assertNotNull(entry);
+            assertNotNull(entry.getStore());
+            assertEquals(ce[i], entry);
+            i++;
+        }
+
+        // Lets delete every other record.
+        int deleteCount = 0;
+        i = 0;
+        for (CacheEntry entry : loaded) {
+            if (i % 2 == 0) {
+                dataStore.remove(entry.getId(), null);
+                deleteCount++;
+            }
+            i++;
+        }
+
+        // Restart and verify that the right records were removed.
+        restartDataStoreManager();
+        dataStore = dataStoreManager.getStore(storeName);
+        dataStore.setDestination(destination);
+
+        assertEquals(count - deleteCount, dataStore.size());
+
+        loaded = dataStore.load(null, null, count * 2);
+        assertEquals(count - deleteCount, loaded.size());
+
+        Iterator<CacheEntry> iterator = loaded.iterator();
+        for (int j = 0; j < count; j++) {
+            if (!(i % 2 == 0)) {
+                CacheEntry entry = iterator.next();
+                assertEquals(ce[j], entry);
+            }
+        }
+
+        // Verify that removing a store wipes out his data.
+        dataStoreManager.removeStore(dataStore);
+        dataStore = dataStoreManager.addStore(storeName);
+        dataStore.setDestination(destination);
+        assertEquals(0, dataStore.size());
+
+    }
+
+    /**
+     * Verify that the create retrieve and delete operations work properly
+     * against a ReferenceStore.
+     * 
+     * @throws Exception
+     */
+    public void testReferenceStoreCRD() throws Exception {
+        String storeName = getName();
+        String refName = getName() + "-ref";
+        DataStore dataStore = dataStoreManager.addStore(storeName);
+        dataStore.setDestination(destination);
+        ReferenceStore refStore = dataStore.addStore(refName);
+
+        int count = 10;
+        CacheEntry ce[] = new CacheEntry[count];
+        for (int i = 0; i < count; i++) {
+            ce[i] = dataStore.addMessage(new Long(i), createTextMessage("Hiram's test: " + i), null);
+            assertNotNull(ce[i]);
+            assertNotNull(ce[i].getStore());
+            assertNotNull(ce[i].getId());
+            assertNotNull(ce[i].getMessage());
+        }
+
+        for (int i = 0; i < count; i++) {
+            refStore.addReference(ce[i]);
+        }
+
+        restartDataStoreManager();
+
+        dataStore = dataStoreManager.getStore(storeName);
+        dataStore.setDestination(destination);
+        refStore = dataStore.getStore(refName);
+
+        assertEquals(count, refStore.size());
+
+        List<CacheEntry> loaded = refStore.load(null, null, count * 2);
+        assertEquals(count, loaded.size());
+
+        // Verify that all the entries were loaded in the order inserted.
+        int i = 0;
+        for (CacheEntry entry : loaded) {
+            assertNotNull(entry);
+            assertNotNull(entry.getStore());
+            assertEquals(ce[i], entry);
+            i++;
+        }
+
+        // Lets delete every other record.
+        int deleteCount = 0;
+        i = 0;
+        for (CacheEntry entry : loaded) {
+            if (i % 2 == 0) {
+                refStore.remove(entry.getId(), null);
+                deleteCount++;
+            }
+            i++;
+        }
+
+        // Restart and verify that the right records were removed.
+        restartDataStoreManager();
+        dataStore = dataStoreManager.getStore(storeName);
+        dataStore.setDestination(destination);
+        refStore = dataStore.getStore(refName);
+
+        assertEquals(count - deleteCount, refStore.size());
+
+        loaded = refStore.load(null, null, count * 2);
+        assertEquals(count - deleteCount, loaded.size());
+
+        Iterator<CacheEntry> iterator = loaded.iterator();
+        for (int j = 0; j < count; j++) {
+            if (!(i % 2 == 0)) {
+                CacheEntry entry = iterator.next();
+                assertEquals(ce[j], entry);
+            }
+        }
+
+        // Verify that removing a store wipes out his data.
+        dataStore.removeStore(refStore);
+        refStore = dataStore.addStore(refName);
+        assertEquals(0, refStore.size());
+
+    }
+
+    public void testCreateDestroyDataStore() throws Exception {
+        String storeName = getName();
+
+        assertNull(dataStoreManager.getStore(storeName));
+        List<DataStore> storees = dataStoreManager.getStores();
+        assertTrue(storees.isEmpty());
+
+        DataStore dataStore = dataStoreManager.addStore(storeName);
+        assertNotNull(dataStore);
+        dataStore.setDestination(destination);
+
+        assertSame(dataStore, dataStoreManager.getStore(storeName));
+        storees = dataStoreManager.getStores();
+        assertEquals(1, storees.size());
+        assertTrue(storees.contains(dataStore));
+
+        // Verify that the data store create was persisted between restart.
+        restartDataStoreManager();
+
+        dataStore = dataStoreManager.getStore(storeName);
+        assertNotNull(dataStore);
+        assertEquals(storeName, dataStore.getName());
+        storees = dataStoreManager.getStores();
+        assertEquals(1, storees.size());
+        assertTrue(storees.contains(dataStore));
+
+        // Verify that the data store remove was persisted between restart.
+        dataStoreManager.removeStore(dataStore);
+        restartDataStoreManager();
+
+        dataStore = dataStoreManager.getStore(storeName);
+        assertNull(dataStore);
+        storees = dataStoreManager.getStores();
+        assertTrue(storees.isEmpty());
+    }
+
+    public void testCreateDestroyReferenceStore() throws Exception {
+
+        String dataName = getName();
+        String refName = getName() + "-ref";
+        DataStore manager = dataStoreManager.addStore(dataName);
+        manager.setDestination(destination);
+
+        assertNull(manager.getStore(refName));
+        List<ReferenceStore> storees = manager.getStores();
+        assertTrue(storees.isEmpty());
+
+        ReferenceStore dataStore = manager.addStore(refName);
+        assertNotNull(dataStore);
+
+        assertSame(dataStore, manager.getStore(refName));
+        storees = manager.getStores();
+        assertEquals(1, storees.size());
+        assertTrue(storees.contains(dataStore));
+
+        // Verify that the data store create was persisted between restart.
+        restartDataStoreManager();
+        manager = dataStoreManager.getStore(dataName);
+
+        dataStore = manager.getStore(refName);
+        assertNotNull(dataStore);
+        assertEquals(refName, dataStore.getName());
+        storees = manager.getStores();
+        assertEquals(1, storees.size());
+        assertTrue(storees.contains(dataStore));
+
+        // Verify that the data store remove was persisted between restart.
+        manager.removeStore(dataStore);
+        restartDataStoreManager();
+        manager = dataStoreManager.getStore(dataName);
+
+        dataStore = manager.getStore(refName);
+        assertNull(dataStore);
+        storees = manager.getStores();
+        assertTrue(storees.isEmpty());
+
+    }
+
+    /**
+     * Verify that store properties can be stored without any issues.
+     * 
+     * @throws Exception
+     */
+    public void testStoreProperties() throws Exception {
+
+        String storeName = getName();
+
+        // Make a relatively large property object..
+        Map<String, String> properties = new HashMap<String, String>();
+        for (int i = 0; i < 1000; i++) {
+            properties.put("key" + i, "value" + i);
+        }
+
+        DataStore dataStore = dataStoreManager.addStore(storeName);
+        dataStore.setDestination(destination);
+        dataStore.setProperties(properties);
+        assertEquals(properties, dataStore.getProperties());
+
+        // Restart and verify the the properties were preserved.
+        restartDataStoreManager();
+
+        dataStore = dataStoreManager.getStore(storeName);
+        assertEquals(properties, dataStore.getProperties());
+
+        dataStore.setProperties(null);
+
+        String refName = storeName + "-ref";
+        ReferenceStore refStore = dataStore.addStore(refName);
+        refStore.setProperties(properties);
+
+        restartDataStoreManager();
+
+        dataStore = dataStoreManager.getStore(storeName);
+        assertNull(dataStore.getProperties());
+        refStore = dataStore.getStore(refName);
+
+        assertEquals(properties, refStore.getProperties());
+
+    }
+
+    private Message createTextMessage(String text) throws MessageNotWriteableException {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setMessageId(new MessageId(producerId, ++msgIdGenerator));
+        message.setDestination(destinationName);
+        message.setPersistent(false);
+        message.setText(text);
+        return message;
+    }
+
+    protected void assertEquals(CacheEntry expected, CacheEntry actual) throws JMSException {
+        assertEquals(expected.getId(), actual.getId());
+        assertEquals(expected.getMessage(), actual.getMessage());
+    }
+
+    protected void assertEquals(Message expected, Message actual) throws JMSException {
+        assertEquals((ActiveMQTextMessage) expected, (ActiveMQTextMessage) actual);
+    }
+
+    protected void assertEquals(ActiveMQTextMessage expected, ActiveMQTextMessage actual) throws JMSException {
+        assertEquals(expected.getMessageId(), actual.getMessageId());
+        assertEquals(expected.getText(), actual.getText());
+    }
+
+}



Mime
View raw message