activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r636615 [2/2] - in /activemq/sandbox/activemq-router/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/router/api/ main/java/org/apache/activemq/broker/router/core/ main/java/org/apache/activemq/broker/router/c...
Date Thu, 13 Mar 2008 03:16:23 GMT
Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexThreadingTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexThreadingTestSupport.java?rev=636615&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexThreadingTestSupport.java
(added)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexThreadingTestSupport.java
Wed Mar 12 20:16:19 2008
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.index;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.broker.router.index.api.DataIndex;
+import org.apache.activemq.broker.router.index.api.IndexEntry;
+import org.apache.activemq.broker.router.index.api.ReferenceIndex;
+
+abstract public class IndexThreadingTestSupport extends IndexTestSupport {
+
+    AtomicBoolean stopping = new AtomicBoolean();
+    AtomicInteger counter = new AtomicInteger();
+    
+    class Publisher extends Thread {
+        private final ReferenceIndex refStore;
+        private DataIndex dataIndex;
+        
+        AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+        AtomicReference<IndexEntry> lastIndexEntry = new AtomicReference<IndexEntry>();
+        private final int id;
+
+        public Publisher(ReferenceIndex refIndex, int id) {
+            super("Publisher: "+id);
+            this.refStore = refIndex;
+            this.id = id;
+            this.dataIndex = refIndex.getDataIndex();
+        }
+
+        @Override
+        public void run() {
+            try {
+                while ( !stopping.get() ) {
+                    addRecord();
+                }
+            } catch (Throwable e) {
+                e.printStackTrace();
+                error.set(e);
+            }
+        }
+
+        private void addRecord() throws Exception {
+            IndexEntry ie;
+            synchronized(this) {
+                long i = getNextId();
+                ie = dataIndex.addMessage(i, createLocation(id, (int) i));              
         
+            }
+            assertNotNull(ie);
+            refStore.addReference(ie);
+            lastIndexEntry.set(ie);   
+            counter.incrementAndGet();
+        }
+
+    }
+    
+
+    long nextId;
+    private long getNextId() {
+        return nextId++;
+    }
+    
+    /**
+     * Verify that the create retrieve and delete operations work properly
+     * against a ReferenceStore.
+     * 
+     * @throws Exception
+     */
+    public void testReferenceStoreCRD() throws Exception {
+        
+        String storeName = getName();
+        String refName = getName() + "-ref";
+        DataIndex dataIndex = dataIndexManager.addStore(storeName);
+        ReferenceIndex refIndex = dataIndex.addStore(refName);
+
+        // Start 2 publishers...
+        Publisher p1 = new Publisher(refIndex, 1);
+        p1.start();
+        Publisher p2 = new Publisher(refIndex, 2);
+        p2.start();
+        
+        long processed=0;
+        try {
+            int testDuration = 1000*5;
+            long start = System.currentTimeMillis();
+            while( true ) {
+                if( System.currentTimeMillis()-start > testDuration ) {
+                    break;
+                }
+                int published = counter.getAndSet(0);
+                if( published > 0 ) {
+                    List<IndexEntry> loaded = refIndex.remove(null, null, published);
+                    assertEquals(published, loaded.size() );
+                    Thread.sleep(10);
+                    processed+=published;
+                } else {
+                    Thread.sleep(100);
+                }
+            }
+        } finally {
+            stopping.set(true);
+            p1.join();
+            p2.join();
+            System.out.println("Processed: "+processed+" messages.");            
+        }
+    }
+
+}

Copied: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaBasicIndexTest.java
(from r635708, activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaIndexTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaBasicIndexTest.java?p2=activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaBasicIndexTest.java&p1=activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaIndexTest.java&r1=635708&r2=636615&rev=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaIndexTest.java
(original)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaBasicIndexTest.java
Wed Mar 12 20:16:19 2008
@@ -18,11 +18,11 @@
 
 import java.io.File;
 
-import org.apache.activemq.broker.router.index.IndexTestSupport;
+import org.apache.activemq.broker.router.index.IndexBasicTestSupport;
 import org.apache.activemq.broker.router.index.api.DataIndexManager;
 import org.apache.activemq.broker.router.store.journal.JournalDataStoreManagerFactory;
 
-public class JpaIndexTest extends IndexTestSupport {
+public class JpaBasicIndexTest extends IndexBasicTestSupport {
 
     @Override
     protected DataIndexManager createDataIndexManager() throws Exception {

Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaThreadingIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaThreadingIndexTest.java?rev=636615&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaThreadingIndexTest.java
(added)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaThreadingIndexTest.java
Wed Mar 12 20:16:19 2008
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.index.jpa;
+
+import java.io.File;
+
+import org.apache.activemq.broker.router.index.IndexThreadingTestSupport;
+import org.apache.activemq.broker.router.index.api.DataIndexManager;
+import org.apache.activemq.broker.router.store.journal.JournalDataStoreManagerFactory;
+
+public class JpaThreadingIndexTest extends IndexThreadingTestSupport {
+
+    @Override
+    protected DataIndexManager createDataIndexManager() throws Exception {
+        JournalDataStoreManagerFactory factory = new JournalDataStoreManagerFactory();
+        factory.setDataDirectory(new File("target/data/" + getName()));
+//        factory.getEntityManagerProperties().put("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+        return factory.createDataIndexManager();
+    }
+
+}

Modified: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/performance/QueueTests.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/performance/QueueTests.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/performance/QueueTests.java
(original)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/performance/QueueTests.java
Wed Mar 12 20:16:19 2008
@@ -74,9 +74,9 @@
     private static final long SAMPLE_DELAY = Integer.parseInt(System.getProperty("SAMPLE_DELAY",
"" + 1000 * 5));
     private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "10000"));
     private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION",
"" + 1000 * 1));
-    private static final int PRODUCER_COUNT = Integer.parseInt(System.getProperty("PRODUCER_COUNT",
"1"));
-    private static final int CONSUMER_COUNT = Integer.parseInt(System.getProperty("CONSUMER_COUNT",
"1"));
-    private static final int persistent = DeliveryMode.NON_PERSISTENT;
+    private static final int PRODUCER_COUNT = Integer.parseInt(System.getProperty("PRODUCER_COUNT",
"10"));
+    private static final int CONSUMER_COUNT = Integer.parseInt(System.getProperty("CONSUMER_COUNT",
"10"));
+    private static final int persistent = DeliveryMode.PERSISTENT;
 
     public ActiveMQDestination destination;
 

Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreBasicTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreBasicTestSupport.java?rev=636615&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreBasicTestSupport.java
(added)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreBasicTestSupport.java
Wed Mar 12 20:16:19 2008
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.broker.router.store.api.CacheEntry;
+import org.apache.activemq.broker.router.store.api.DataStore;
+import org.apache.activemq.broker.router.store.api.ReferenceStore;
+
+abstract public class StoreBasicTestSupport extends StoreTestSupport {
+
+    /**
+     * Verify that the create retrieve and delete operations work properly
+     * against a DataStore.
+     * 
+     * @throws Exception
+     */
+    public void testDataStoreCRD() throws Exception {
+        String storeName = getName();
+        DataStore dataStore = dataStoreManager.addStore(storeName);
+        dataStore.setDestination(destination);
+
+        int count = 10;
+        CacheEntry ce[] = new CacheEntry[count];
+        for (int i = 0; i < count; i++) {
+            ce[i] = dataStore.addMessage(new Long(i), createTextMessage("Hiram's test: "
+ i), null);
+            assertNotNull(ce[i]);
+            assertNotNull(ce[i].getStore());
+            assertNotNull(ce[i].getId());
+            assertNotNull(ce[i].getMessage());
+        }
+
+        restartDataStoreManager();
+        dataStore = dataStoreManager.getStore(storeName);
+        dataStore.setDestination(destination);
+
+        assertEquals(count, dataStore.size());
+
+        List<CacheEntry> loaded = dataStore.load(null, null, count * 2);
+        assertEquals(count, loaded.size());
+
+        // Verify that all the entries were loaded in the order inserted.
+        int i = 0;
+        for (CacheEntry entry : loaded) {
+            assertNotNull(entry);
+            assertNotNull(entry.getStore());
+            assertEquals(ce[i], entry);
+            i++;
+        }
+
+        // Lets delete every other record.
+        int deleteCount = 0;
+        i = 0;
+        for (CacheEntry entry : loaded) {
+            if (i % 2 == 0) {
+                dataStore.remove(entry.getId(), null);
+                deleteCount++;
+            }
+            i++;
+        }
+
+        // Restart and verify that the right records were removed.
+        restartDataStoreManager();
+        dataStore = dataStoreManager.getStore(storeName);
+        dataStore.setDestination(destination);
+
+        assertEquals(count - deleteCount, dataStore.size());
+
+        loaded = dataStore.load(null, null, count * 2);
+        assertEquals(count - deleteCount, loaded.size());
+
+        Iterator<CacheEntry> iterator = loaded.iterator();
+        for (int j = 0; j < count; j++) {
+            if (!(i % 2 == 0)) {
+                CacheEntry entry = iterator.next();
+                assertEquals(ce[j], entry);
+            }
+        }
+
+        // Verify that removing a store wipes out his data.
+        dataStoreManager.removeStore(dataStore);
+        dataStore = dataStoreManager.addStore(storeName);
+        dataStore.setDestination(destination);
+        assertEquals(0, dataStore.size());
+
+    }
+
+    /**
+     * Verify that the create retrieve and delete operations work properly
+     * against a ReferenceStore.
+     * 
+     * @throws Exception
+     */
+    public void testReferenceStoreCRD() throws Exception {
+        String storeName = getName();
+        String refName = getName() + "-ref";
+        DataStore dataStore = dataStoreManager.addStore(storeName);
+        dataStore.setDestination(destination);
+        ReferenceStore refStore = dataStore.addStore(refName);
+
+        int count = 10;
+        CacheEntry ce[] = new CacheEntry[count];
+        for (int i = 0; i < count; i++) {
+            ce[i] = dataStore.addMessage(new Long(i), createTextMessage("Hiram's test: "
+ i), null);
+            assertNotNull(ce[i]);
+            assertNotNull(ce[i].getStore());
+            assertNotNull(ce[i].getId());
+            assertNotNull(ce[i].getMessage());
+        }
+
+        for (int i = 0; i < count; i++) {
+            refStore.addReference(ce[i]);
+        }
+
+        restartDataStoreManager();
+
+        dataStore = dataStoreManager.getStore(storeName);
+        dataStore.setDestination(destination);
+        refStore = dataStore.getStore(refName);
+
+        assertEquals(count, refStore.size());
+
+        List<CacheEntry> loaded = refStore.load(null, null, count * 2);
+        assertEquals(count, loaded.size());
+
+        // Verify that all the entries were loaded in the order inserted.
+        int i = 0;
+        for (CacheEntry entry : loaded) {
+            assertNotNull(entry);
+            assertNotNull(entry.getStore());
+            assertEquals(ce[i], entry);
+            i++;
+        }
+
+        // Lets delete every other record.
+        int deleteCount = 0;
+        i = 0;
+        for (CacheEntry entry : loaded) {
+            if (i % 2 == 0) {
+                refStore.remove(entry.getId(), null);
+                deleteCount++;
+            }
+            i++;
+        }
+
+        // Restart and verify that the right records were removed.
+        restartDataStoreManager();
+        dataStore = dataStoreManager.getStore(storeName);
+        dataStore.setDestination(destination);
+        refStore = dataStore.getStore(refName);
+
+        assertEquals(count - deleteCount, refStore.size());
+
+        loaded = refStore.load(null, null, count * 2);
+        assertEquals(count - deleteCount, loaded.size());
+
+        Iterator<CacheEntry> iterator = loaded.iterator();
+        for (int j = 0; j < count; j++) {
+            if (!(i % 2 == 0)) {
+                CacheEntry entry = iterator.next();
+                assertEquals(ce[j], entry);
+            }
+        }
+
+        // Verify that removing a store wipes out his data.
+        dataStore.removeStore(refStore);
+        refStore = dataStore.addStore(refName);
+        assertEquals(0, refStore.size());
+
+    }
+
+    public void testCreateDestroyDataStore() throws Exception {
+        String storeName = getName();
+
+        assertNull(dataStoreManager.getStore(storeName));
+        List<DataStore> storees = dataStoreManager.getStores();
+        assertTrue(storees.isEmpty());
+
+        DataStore dataStore = dataStoreManager.addStore(storeName);
+        assertNotNull(dataStore);
+        dataStore.setDestination(destination);
+
+        assertSame(dataStore, dataStoreManager.getStore(storeName));
+        storees = dataStoreManager.getStores();
+        assertEquals(1, storees.size());
+        assertTrue(storees.contains(dataStore));
+
+        // Verify that the data store create was persisted between restart.
+        restartDataStoreManager();
+
+        dataStore = dataStoreManager.getStore(storeName);
+        assertNotNull(dataStore);
+        assertEquals(storeName, dataStore.getName());
+        storees = dataStoreManager.getStores();
+        assertEquals(1, storees.size());
+        assertTrue(storees.contains(dataStore));
+
+        // Verify that the data store remove was persisted between restart.
+        dataStoreManager.removeStore(dataStore);
+        restartDataStoreManager();
+
+        dataStore = dataStoreManager.getStore(storeName);
+        assertNull(dataStore);
+        storees = dataStoreManager.getStores();
+        assertTrue(storees.isEmpty());
+    }
+
+    public void testCreateDestroyReferenceStore() throws Exception {
+
+        String dataName = getName();
+        String refName = getName() + "-ref";
+        DataStore manager = dataStoreManager.addStore(dataName);
+        manager.setDestination(destination);
+
+        assertNull(manager.getStore(refName));
+        List<ReferenceStore> storees = manager.getStores();
+        assertTrue(storees.isEmpty());
+
+        ReferenceStore dataStore = manager.addStore(refName);
+        assertNotNull(dataStore);
+
+        assertSame(dataStore, manager.getStore(refName));
+        storees = manager.getStores();
+        assertEquals(1, storees.size());
+        assertTrue(storees.contains(dataStore));
+
+        // Verify that the data store create was persisted between restart.
+        restartDataStoreManager();
+        manager = dataStoreManager.getStore(dataName);
+
+        dataStore = manager.getStore(refName);
+        assertNotNull(dataStore);
+        assertEquals(refName, dataStore.getName());
+        storees = manager.getStores();
+        assertEquals(1, storees.size());
+        assertTrue(storees.contains(dataStore));
+
+        // Verify that the data store remove was persisted between restart.
+        manager.removeStore(dataStore);
+        restartDataStoreManager();
+        manager = dataStoreManager.getStore(dataName);
+
+        dataStore = manager.getStore(refName);
+        assertNull(dataStore);
+        storees = manager.getStores();
+        assertTrue(storees.isEmpty());
+
+    }
+
+    /**
+     * Verify that store properties can be stored without any issues.
+     * 
+     * @throws Exception
+     */
+    public void testStoreProperties() throws Exception {
+
+        String storeName = getName();
+
+        // Make a relatively large property object..
+        Map<String, String> properties = new HashMap<String, String>();
+        for (int i = 0; i < 1000; i++) {
+            properties.put("key" + i, "value" + i);
+        }
+
+        DataStore dataStore = dataStoreManager.addStore(storeName);
+        dataStore.setDestination(destination);
+        dataStore.setProperties(properties);
+        assertEquals(properties, dataStore.getProperties());
+
+        // Restart and verify the the properties were preserved.
+        restartDataStoreManager();
+
+        dataStore = dataStoreManager.getStore(storeName);
+        assertEquals(properties, dataStore.getProperties());
+
+        dataStore.setProperties(null);
+
+        String refName = storeName + "-ref";
+        ReferenceStore refStore = dataStore.addStore(refName);
+        refStore.setProperties(properties);
+
+        restartDataStoreManager();
+
+        dataStore = dataStoreManager.getStore(storeName);
+        assertNull(dataStore.getProperties());
+        refStore = dataStore.getStore(refName);
+
+        assertEquals(properties, refStore.getProperties());
+
+    }
+}

Modified: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreTestSupport.java?rev=636615&r1=636614&r2=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreTestSupport.java
(original)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreTestSupport.java
Wed Mar 12 20:16:19 2008
@@ -16,10 +16,7 @@
  */
 package org.apache.activemq.broker.router.store;
 
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 import javax.jms.JMSException;
 import javax.jms.MessageNotWriteableException;
@@ -31,7 +28,6 @@
 import org.apache.activemq.broker.router.store.api.CacheEntry;
 import org.apache.activemq.broker.router.store.api.DataStore;
 import org.apache.activemq.broker.router.store.api.DataStoreManager;
-import org.apache.activemq.broker.router.store.api.ReferenceStore;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ConnectionId;
@@ -43,12 +39,12 @@
 
 abstract public class StoreTestSupport extends TestCase {
 
-    DataStoreManager dataStoreManager;
-    private ProducerId producerId;
-    private int msgIdGenerator;
-    private ActiveMQQueue destinationName;
+    protected DataStoreManager dataStoreManager;
+    protected ProducerId producerId;
+    protected int msgIdGenerator;
+    protected ActiveMQQueue destinationName;
 
-    private Destination destination = new StubDestination() {
+    protected Destination destination = new StubDestination() {
         SystemUsage systemUsage = new SystemUsage();
 
         @Override
@@ -85,287 +81,7 @@
         dataStoreManager = null;
     }
 
-    /**
-     * Verify that the create retrieve and delete operations work properly
-     * against a DataStore.
-     * 
-     * @throws Exception
-     */
-    public void testDataStoreCRD() throws Exception {
-        String storeName = getName();
-        DataStore dataStore = dataStoreManager.addStore(storeName);
-        dataStore.setDestination(destination);
-
-        int count = 10;
-        CacheEntry ce[] = new CacheEntry[count];
-        for (int i = 0; i < count; i++) {
-            ce[i] = dataStore.addMessage(new Long(i), createTextMessage("Hiram's test: "
+ i), null);
-            assertNotNull(ce[i]);
-            assertNotNull(ce[i].getStore());
-            assertNotNull(ce[i].getId());
-            assertNotNull(ce[i].getMessage());
-        }
-
-        restartDataStoreManager();
-        dataStore = dataStoreManager.getStore(storeName);
-        dataStore.setDestination(destination);
-
-        assertEquals(count, dataStore.size());
-
-        List<CacheEntry> loaded = dataStore.load(null, null, count * 2);
-        assertEquals(count, loaded.size());
-
-        // Verify that all the entries were loaded in the order inserted.
-        int i = 0;
-        for (CacheEntry entry : loaded) {
-            assertNotNull(entry);
-            assertNotNull(entry.getStore());
-            assertEquals(ce[i], entry);
-            i++;
-        }
-
-        // Lets delete every other record.
-        int deleteCount = 0;
-        i = 0;
-        for (CacheEntry entry : loaded) {
-            if (i % 2 == 0) {
-                dataStore.remove(entry.getId(), null);
-                deleteCount++;
-            }
-            i++;
-        }
-
-        // Restart and verify that the right records were removed.
-        restartDataStoreManager();
-        dataStore = dataStoreManager.getStore(storeName);
-        dataStore.setDestination(destination);
-
-        assertEquals(count - deleteCount, dataStore.size());
-
-        loaded = dataStore.load(null, null, count * 2);
-        assertEquals(count - deleteCount, loaded.size());
-
-        Iterator<CacheEntry> iterator = loaded.iterator();
-        for (int j = 0; j < count; j++) {
-            if (!(i % 2 == 0)) {
-                CacheEntry entry = iterator.next();
-                assertEquals(ce[j], entry);
-            }
-        }
-
-        // Verify that removing a store wipes out his data.
-        dataStoreManager.removeStore(dataStore);
-        dataStore = dataStoreManager.addStore(storeName);
-        dataStore.setDestination(destination);
-        assertEquals(0, dataStore.size());
-
-    }
-
-    /**
-     * Verify that the create retrieve and delete operations work properly
-     * against a ReferenceStore.
-     * 
-     * @throws Exception
-     */
-    public void testReferenceStoreCRD() throws Exception {
-        String storeName = getName();
-        String refName = getName() + "-ref";
-        DataStore dataStore = dataStoreManager.addStore(storeName);
-        dataStore.setDestination(destination);
-        ReferenceStore refStore = dataStore.addStore(refName);
-
-        int count = 10;
-        CacheEntry ce[] = new CacheEntry[count];
-        for (int i = 0; i < count; i++) {
-            ce[i] = dataStore.addMessage(new Long(i), createTextMessage("Hiram's test: "
+ i), null);
-            assertNotNull(ce[i]);
-            assertNotNull(ce[i].getStore());
-            assertNotNull(ce[i].getId());
-            assertNotNull(ce[i].getMessage());
-        }
-
-        for (int i = 0; i < count; i++) {
-            refStore.addReference(ce[i]);
-        }
-
-        restartDataStoreManager();
-
-        dataStore = dataStoreManager.getStore(storeName);
-        dataStore.setDestination(destination);
-        refStore = dataStore.getStore(refName);
-
-        assertEquals(count, refStore.size());
-
-        List<CacheEntry> loaded = refStore.load(null, null, count * 2);
-        assertEquals(count, loaded.size());
-
-        // Verify that all the entries were loaded in the order inserted.
-        int i = 0;
-        for (CacheEntry entry : loaded) {
-            assertNotNull(entry);
-            assertNotNull(entry.getStore());
-            assertEquals(ce[i], entry);
-            i++;
-        }
-
-        // Lets delete every other record.
-        int deleteCount = 0;
-        i = 0;
-        for (CacheEntry entry : loaded) {
-            if (i % 2 == 0) {
-                refStore.remove(entry.getId(), null);
-                deleteCount++;
-            }
-            i++;
-        }
-
-        // Restart and verify that the right records were removed.
-        restartDataStoreManager();
-        dataStore = dataStoreManager.getStore(storeName);
-        dataStore.setDestination(destination);
-        refStore = dataStore.getStore(refName);
-
-        assertEquals(count - deleteCount, refStore.size());
-
-        loaded = refStore.load(null, null, count * 2);
-        assertEquals(count - deleteCount, loaded.size());
-
-        Iterator<CacheEntry> iterator = loaded.iterator();
-        for (int j = 0; j < count; j++) {
-            if (!(i % 2 == 0)) {
-                CacheEntry entry = iterator.next();
-                assertEquals(ce[j], entry);
-            }
-        }
-
-        // Verify that removing a store wipes out his data.
-        dataStore.removeStore(refStore);
-        refStore = dataStore.addStore(refName);
-        assertEquals(0, refStore.size());
-
-    }
-
-    public void testCreateDestroyDataStore() throws Exception {
-        String storeName = getName();
-
-        assertNull(dataStoreManager.getStore(storeName));
-        List<DataStore> storees = dataStoreManager.getStores();
-        assertTrue(storees.isEmpty());
-
-        DataStore dataStore = dataStoreManager.addStore(storeName);
-        assertNotNull(dataStore);
-        dataStore.setDestination(destination);
-
-        assertSame(dataStore, dataStoreManager.getStore(storeName));
-        storees = dataStoreManager.getStores();
-        assertEquals(1, storees.size());
-        assertTrue(storees.contains(dataStore));
-
-        // Verify that the data store create was persisted between restart.
-        restartDataStoreManager();
-
-        dataStore = dataStoreManager.getStore(storeName);
-        assertNotNull(dataStore);
-        assertEquals(storeName, dataStore.getName());
-        storees = dataStoreManager.getStores();
-        assertEquals(1, storees.size());
-        assertTrue(storees.contains(dataStore));
-
-        // Verify that the data store remove was persisted between restart.
-        dataStoreManager.removeStore(dataStore);
-        restartDataStoreManager();
-
-        dataStore = dataStoreManager.getStore(storeName);
-        assertNull(dataStore);
-        storees = dataStoreManager.getStores();
-        assertTrue(storees.isEmpty());
-    }
-
-    public void testCreateDestroyReferenceStore() throws Exception {
-
-        String dataName = getName();
-        String refName = getName() + "-ref";
-        DataStore manager = dataStoreManager.addStore(dataName);
-        manager.setDestination(destination);
-
-        assertNull(manager.getStore(refName));
-        List<ReferenceStore> storees = manager.getStores();
-        assertTrue(storees.isEmpty());
-
-        ReferenceStore dataStore = manager.addStore(refName);
-        assertNotNull(dataStore);
-
-        assertSame(dataStore, manager.getStore(refName));
-        storees = manager.getStores();
-        assertEquals(1, storees.size());
-        assertTrue(storees.contains(dataStore));
-
-        // Verify that the data store create was persisted between restart.
-        restartDataStoreManager();
-        manager = dataStoreManager.getStore(dataName);
-
-        dataStore = manager.getStore(refName);
-        assertNotNull(dataStore);
-        assertEquals(refName, dataStore.getName());
-        storees = manager.getStores();
-        assertEquals(1, storees.size());
-        assertTrue(storees.contains(dataStore));
-
-        // Verify that the data store remove was persisted between restart.
-        manager.removeStore(dataStore);
-        restartDataStoreManager();
-        manager = dataStoreManager.getStore(dataName);
-
-        dataStore = manager.getStore(refName);
-        assertNull(dataStore);
-        storees = manager.getStores();
-        assertTrue(storees.isEmpty());
-
-    }
-
-    /**
-     * Verify that store properties can be stored without any issues.
-     * 
-     * @throws Exception
-     */
-    public void testStoreProperties() throws Exception {
-
-        String storeName = getName();
-
-        // Make a relatively large property object..
-        Map<String, String> properties = new HashMap<String, String>();
-        for (int i = 0; i < 1000; i++) {
-            properties.put("key" + i, "value" + i);
-        }
-
-        DataStore dataStore = dataStoreManager.addStore(storeName);
-        dataStore.setDestination(destination);
-        dataStore.setProperties(properties);
-        assertEquals(properties, dataStore.getProperties());
-
-        // Restart and verify the the properties were preserved.
-        restartDataStoreManager();
-
-        dataStore = dataStoreManager.getStore(storeName);
-        assertEquals(properties, dataStore.getProperties());
-
-        dataStore.setProperties(null);
-
-        String refName = storeName + "-ref";
-        ReferenceStore refStore = dataStore.addStore(refName);
-        refStore.setProperties(properties);
-
-        restartDataStoreManager();
-
-        dataStore = dataStoreManager.getStore(storeName);
-        assertNull(dataStore.getProperties());
-        refStore = dataStore.getStore(refName);
-
-        assertEquals(properties, refStore.getProperties());
-
-    }
-
-    private Message createTextMessage(String text) throws MessageNotWriteableException {
+    protected Message createTextMessage(String text) throws MessageNotWriteableException
{
         ActiveMQTextMessage message = new ActiveMQTextMessage();
         message.setMessageId(new MessageId(producerId, ++msgIdGenerator));
         message.setDestination(destinationName);

Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreThreadingTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreThreadingTestSupport.java?rev=636615&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreThreadingTestSupport.java
(added)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreThreadingTestSupport.java
Wed Mar 12 20:16:19 2008
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.broker.router.store.api.CacheEntry;
+import org.apache.activemq.broker.router.store.api.DataStore;
+import org.apache.activemq.broker.router.store.api.ReferenceStore;
+
+/** 
+ * These are more complex tests that stress the Store API to see if the implementation 
+ * is thread safe.
+ *  
+ * @author chirino
+ */
+abstract public class StoreThreadingTestSupport extends StoreTestSupport {
+
+    AtomicBoolean stopping = new AtomicBoolean();
+    AtomicInteger counter = new AtomicInteger();
+    
+    class Publisher extends Thread {
+        private final ReferenceStore refStore;
+        private DataStore dataStore;
+        
+        AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+        AtomicReference<CacheEntry> lastCacheEntry = new AtomicReference<CacheEntry>();
+        private final int id;
+
+        public Publisher(ReferenceStore refStore, int id) {
+            super("Publisher: "+id);
+            this.refStore = refStore;
+            this.id = id;
+            this.dataStore = refStore.getDataStore();
+        }
+
+        @Override
+        public void run() {
+            try {
+                while ( !stopping.get() ) {
+                    CacheEntry ce;
+                    synchronized(this) {
+                        long i = getNextId();
+                        ce = dataStore.addMessage(new Long(i), createTextMessage("Publisher:
"+id+", message: " + i), null);                        
+                    }
+                    assertNotNull(ce);
+                    assertNotNull(ce.getStore());
+                    assertNotNull(ce.getId());
+                    assertNotNull(ce.getMessage());
+                    refStore.addReference(ce);
+                    lastCacheEntry.set(ce);   
+                    counter.incrementAndGet();
+                }
+            } catch (Throwable e) {
+                e.printStackTrace();
+                error.set(e);
+            }
+        }
+
+    }
+    
+
+    long nextId;
+    private long getNextId() {
+        return nextId++;
+    }
+    
+    /**
+     * Verify that the create retrieve and delete operations work properly
+     * against a ReferenceStore.
+     * 
+     * @throws Exception
+     */
+    public void testReferenceStoreCRD() throws Exception {
+        String storeName = getName();
+        String refName = getName() + "-ref";
+        DataStore dataStore = dataStoreManager.addStore(storeName);
+        dataStore.setDestination(destination);
+        ReferenceStore refStore = dataStore.addStore(refName);
+
+        // Start 2 publishers...
+        Publisher p1 = new Publisher(refStore, 1);
+        p1.start();
+        Publisher p2 = new Publisher(refStore, 2);
+        p2.start();
+        
+        long processed=0;
+        try {
+            int testDuration = 1000*5;
+            long start = System.currentTimeMillis();
+            while( true ) {
+                if( System.currentTimeMillis()-start > testDuration ) {
+                    break;
+                }
+                int published = counter.getAndSet(0);
+                if( published > 0 ) {
+                    List<CacheEntry> loaded = refStore.remove(null, null, published);
+                    assertEquals(published, loaded.size() );
+                    Thread.sleep(10);
+                    processed+=published;
+                } else {
+                    Thread.sleep(100);
+                }
+            }
+        } finally {
+            stopping.set(true);
+            p1.join();
+            p2.join();
+            System.out.println("Processed: "+processed+" messages.");            
+        }
+    }
+
+}

Copied: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalBasicStoreTest.java
(from r635708, activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalStoreTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalBasicStoreTest.java?p2=activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalBasicStoreTest.java&p1=activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalStoreTest.java&r1=635708&r2=636615&rev=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalStoreTest.java
(original)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalBasicStoreTest.java
Wed Mar 12 20:16:19 2008
@@ -18,10 +18,10 @@
 
 import java.io.File;
 
-import org.apache.activemq.broker.router.store.StoreTestSupport;
+import org.apache.activemq.broker.router.store.StoreBasicTestSupport;
 import org.apache.activemq.broker.router.store.api.DataStoreManager;
 
-public class JournalStoreTest extends StoreTestSupport {
+public class JournalBasicStoreTest extends StoreBasicTestSupport {
 
     @Override
     protected DataStoreManager createDataStoreManager() throws Exception {

Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalThreadingStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalThreadingStoreTest.java?rev=636615&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalThreadingStoreTest.java
(added)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalThreadingStoreTest.java
Wed Mar 12 20:16:19 2008
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.journal;
+
+import java.io.File;
+
+import org.apache.activemq.broker.router.store.StoreThreadingTestSupport;
+import org.apache.activemq.broker.router.store.api.DataStoreManager;
+
+public class JournalThreadingStoreTest extends StoreThreadingTestSupport {
+
+    @Override
+    protected DataStoreManager createDataStoreManager() throws Exception {
+        JournalDataStoreManagerFactory factory = new JournalDataStoreManagerFactory();
+        factory.setDataDirectory(new File("target/data/" + getName()));
+//        factory.getEntityManagerProperties().put("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
+        return factory.createJournalDataStoreManager();
+    }
+
+}

Copied: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreBasicTest.java
(from r635708, activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreBasicTest.java?p2=activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreBasicTest.java&p1=activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreTest.java&r1=635708&r2=636615&rev=636615&view=diff
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreTest.java
(original)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreBasicTest.java
Wed Mar 12 20:16:19 2008
@@ -16,13 +16,13 @@
  */
 package org.apache.activemq.broker.router.store.memory;
 
-import org.apache.activemq.broker.router.store.StoreTestSupport;
+import org.apache.activemq.broker.router.store.StoreBasicTestSupport;
 import org.apache.activemq.broker.router.store.api.DataStoreManager;
 
 /**
  * @author chirino
  */
-public class MemoryStoreTest extends StoreTestSupport {
+public class MemoryStoreBasicTest extends StoreBasicTestSupport {
 
     @Override
     protected DataStoreManager createDataStoreManager() throws Exception {

Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreThreadingTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreThreadingTest.java?rev=636615&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreThreadingTest.java
(added)
+++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreThreadingTest.java
Wed Mar 12 20:16:19 2008
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.store.memory;
+
+import org.apache.activemq.broker.router.store.StoreThreadingTestSupport;
+import org.apache.activemq.broker.router.store.api.DataStoreManager;
+
+/**
+ * @author chirino
+ */
+public class MemoryStoreThreadingTest extends StoreThreadingTestSupport {
+
+    @Override
+    protected DataStoreManager createDataStoreManager() throws Exception {
+        MemoryDataStoreManager rc = new MemoryDataStoreManager();
+        return rc;
+    }
+
+    @Override
+    protected void restartDataStoreManager() throws Exception {
+        // Sorry we don't really support restarts..
+    }
+
+}



Mime
View raw message