activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1418686 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/ activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/ activemq-brok...
Date Sat, 08 Dec 2012 15:21:17 GMT
Author: chirino
Date: Sat Dec  8 15:21:14 2012
New Revision: 1418686

URL: http://svn.apache.org/viewvc?rev=1418686&view=rev
Log:
Fixes AMQ-4215: Simplify PList interface and provide a LevelDB store implementation.

Added:
    activemq/trunk/activemq-broker/src/test/java/org/apache/activemq/store/PListTestSupport.java
      - copied, changed from r1418454, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/plist/
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/plist/PListImplTest.java
    activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/LevelDBPlistTest.java
      - copied, changed from r1418454, activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PList.java
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PList.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PListEntry.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
    activemq/trunk/activemq-kahadb-store/pom.xml
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListStoreImpl.java
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1418686&r1=1418685&r2=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java Sat Dec  8 15:21:14 2012
@@ -1530,6 +1530,16 @@ public class BrokerService implements Se
             if (!isPersistent()) {
                 return null;
             }
+
+            try {
+                PersistenceAdapter pa = getPersistenceAdapter();
+                if( pa!=null && pa instanceof PListStore) {
+                    return (PListStore) pa;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
             boolean result = true;
             boolean empty = true;
             try {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?rev=1418686&r1=1418685&r2=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java Sat Dec  8 15:21:14 2012
@@ -37,12 +37,14 @@ public class IndirectMessageReference im
     private boolean acked;
     /** Direct reference to the message */
     private final Message message;
+    private final MessageId messageId;
     
     /**
      * @param message
      */
     public IndirectMessageReference(final Message message) {
         this.message = message;
+        this.messageId = message.getMessageId().copy();
         message.getMessageId();
         message.getGroupID();
         message.getGroupSequence();
@@ -111,7 +113,7 @@ public class IndirectMessageReference im
     }
 
     public MessageId getMessageId() {
-        return message.getMessageId();
+        return messageId;
     }
 
     public Message.MessageDestination getRegionDestination() {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1418686&r1=1418685&r2=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Sat Dec  8 15:21:14 2012
@@ -100,6 +100,9 @@ public class TopicSubscription extends A
         if (isDuplicate(node)) {
             return;
         }
+        // Lets use an indirect reference so that we can associate a unique
+        // locator /w the message.
+        node = new IndirectMessageReference(node.getMessage());
         enqueueCounter.incrementAndGet();
         if (!isFull() && matched.isEmpty()) {
             // if maximumPendingMessages is set we will only discard messages which
@@ -540,7 +543,7 @@ public class TopicSubscription extends A
     }
 
     private void dispatch(final MessageReference node) throws IOException {
-        Message message = (Message)node;
+        Message message = node.getMessage();
         if (node != null) {
             node.incrementReferenceCount();
         }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=1418686&r1=1418685&r2=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Sat Dec  8 15:21:14 2012
@@ -278,7 +278,8 @@ public class FilePendingMessageCursor ex
                 systemUsage.getTempUsage().waitForSpace();
                 node.decrementReferenceCount();
                 ByteSequence bs = getByteSequence(node.getMessage());
-                getDiskList().addFirst(node.getMessageId().toString(), bs);
+                Object locator = getDiskList().addFirst(node.getMessageId().toString(), bs);
+                node.getMessageId().setPlistLocator(locator);
 
             } catch (Exception e) {
                 LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
@@ -335,7 +336,7 @@ public class FilePendingMessageCursor ex
         }
         if (!isDiskListEmpty()) {
             try {
-                getDiskList().remove(node.getMessageId().toString());
+                getDiskList().remove(node.getMessageId().getPlistLocator());
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }
@@ -506,7 +507,9 @@ public class FilePendingMessageCursor ex
         public MessageReference next() {
             try {
                 PListEntry entry = iterator.next();
-                return getMessage(entry.getByteSequence());
+                Message message = getMessage(entry.getByteSequence());
+                message.getMessageId().setPlistLocator(entry.getLocator());
+                return message;
             } catch (IOException e) {
                 LOG.error("I/O error", e);
                 throw new RuntimeException(e);

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PList.java?rev=1418686&r1=1418685&r2=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PList.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PList.java Sat Dec  8 15:21:14 2012
@@ -16,7 +16,9 @@
  */
 package org.apache.activemq.store;
 
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -25,30 +27,17 @@ import java.util.Iterator;
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public interface PList {
-    void setName(String name);
-
     String getName();
 
     void destroy() throws IOException;
 
-    void addLast(String id, ByteSequence bs) throws IOException;
-
-    void addFirst(String id, ByteSequence bs) throws IOException;
-
-    boolean remove(String id) throws IOException;
-
-    boolean remove(long position) throws IOException;
+    Object addFirst(String id, ByteSequence bs) throws IOException;
+    Object addLast(String id, ByteSequence bs) throws IOException;
 
-    PListEntry get(long position) throws IOException;
-
-    PListEntry getFirst() throws IOException;
-
-    PListEntry getLast() throws IOException;
+    boolean remove(Object position) throws IOException;
 
     boolean isEmpty();
-
     PListIterator iterator() throws IOException;
-
     long size();
 
     public interface PListIterator extends Iterator<PListEntry> {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PListEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PListEntry.java?rev=1418686&r1=1418685&r2=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PListEntry.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PListEntry.java Sat Dec  8 15:21:14 2012
@@ -22,10 +22,12 @@ public class PListEntry {
 
     private final ByteSequence byteSequence;
     private final String entry;
+    private final Object locator;
 
-    public PListEntry(String entry, ByteSequence bs) {
+    public PListEntry(String entry, ByteSequence bs, Object locator) {
         this.entry = entry;
         this.byteSequence = bs;
+        this.locator = locator;
     }
 
     public ByteSequence getByteSequence() {
@@ -36,7 +38,11 @@ public class PListEntry {
         return this.entry;
     }
 
+    public Object getLocator() {
+        return locator;
+    }
+
     public PListEntry copy() {
-        return new PListEntry(this.entry, this.byteSequence);
+        return new PListEntry(this.entry, this.byteSequence, locator);
     }
 }

Copied: activemq/trunk/activemq-broker/src/test/java/org/apache/activemq/store/PListTestSupport.java (from r1418454, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/test/java/org/apache/activemq/store/PListTestSupport.java?p2=activemq/trunk/activemq-broker/src/test/java/org/apache/activemq/store/PListTestSupport.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java&r1=1418454&r2=1418686&rev=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java (original)
+++ activemq/trunk/activemq-broker/src/test/java/org/apache/activemq/store/PListTestSupport.java Sat Dec  8 15:21:14 2012
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.store.kahadb.plist;
+package org.apache.activemq.store;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -22,29 +22,25 @@ import static org.junit.Assert.assertTru
 
 import java.io.File;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Vector;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.activemq.store.PList;
-import org.apache.activemq.store.PListEntry;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.ByteSequence;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PListTest {
-    static final Logger LOG = LoggerFactory.getLogger(PListTest.class);
-    private PListStoreImpl store;
-    private PListImpl plist;
+public abstract class PListTestSupport {
+    static final Logger LOG = LoggerFactory.getLogger(PListTestSupport.class);
+    private PListStore store;
+    private PList plist;
     final ByteSequence payload = new ByteSequence(new byte[400]);
     final String idSeed = new String("Seed" + new byte[1024]);
     final Vector<Throwable> exceptions = new Vector<Throwable>();
@@ -53,52 +49,73 @@ public class PListTest {
     @Test
     public void testAddLast() throws Exception {
         final int COUNT = 1000;
-        Map<String, ByteSequence> map = new LinkedHashMap<String, ByteSequence>();
+        LinkedList<ByteSequence> list = new LinkedList<ByteSequence>();
         for (int i = 0; i < COUNT; i++) {
             String test = new String("test" + i);
             ByteSequence bs = new ByteSequence(test.getBytes());
-            map.put(test, bs);
+            list.addLast(bs);
             plist.addLast(test, bs);
         }
         assertEquals(plist.size(), COUNT);
-        int count = 0;
-        for (ByteSequence bs : map.values()) {
+
+        PList.PListIterator actual = plist.iterator();
+        Iterator<ByteSequence> expected = list.iterator();
+        while (expected.hasNext()) {
+            ByteSequence bs = expected.next();
+            assertTrue(actual.hasNext());
+            PListEntry entry = actual.next();
             String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength());
-            PListEntry entry = plist.get(count);
             String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(),
                     entry.getByteSequence().getLength());
             assertEquals(origStr, plistString);
-            count++;
         }
+        assertFalse(actual.hasNext());
     }
 
    @Test
     public void testAddFirst() throws Exception {
-        final int COUNT = 1000;
-        Map<String, ByteSequence> map = new LinkedHashMap<String, ByteSequence>();
-        for (int i = 0; i < COUNT; i++) {
-            String test = new String("test" + i);
-            ByteSequence bs = new ByteSequence(test.getBytes());
-            map.put(test, bs);
-            plist.addFirst(test, bs);
-        }
-        assertEquals(plist.size(), COUNT);
-        long count = plist.size() - 1;
-        for (ByteSequence bs : map.values()) {
-            String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength());
-            PListEntry entry = plist.get(count);
-            String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(),
-                    entry.getByteSequence().getLength());
-            assertEquals(origStr, plistString);
-            count--;
-        }
-    }
+       final int COUNT = 1000;
+       LinkedList<ByteSequence> list = new LinkedList<ByteSequence>();
+       for (int i = 0; i < COUNT; i++) {
+           String test = new String("test" + i);
+           ByteSequence bs = new ByteSequence(test.getBytes());
+           list.addFirst(bs);
+           plist.addFirst(test, bs);
+       }
+       assertEquals(plist.size(), COUNT);
+
+       PList.PListIterator actual = plist.iterator();
+       Iterator<ByteSequence> expected = list.iterator();
+       while (expected.hasNext()) {
+           ByteSequence bs = expected.next();
+           assertTrue(actual.hasNext());
+           PListEntry entry = actual.next();
+           String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength());
+           String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(),
+                   entry.getByteSequence().getLength());
+           assertEquals(origStr, plistString);
+       }
+       assertFalse(actual.hasNext());
+   }
 
     @Test
     public void testRemove() throws IOException {
         doTestRemove(2000);
     }
 
+    private PListEntry getFirst(PList plist) throws IOException {
+        PList.PListIterator iterator = plist.iterator();
+        try {
+            if( iterator.hasNext() ) {
+                return iterator.next();
+            } else {
+                return null;
+            }
+        }finally {
+            iterator.release();
+        }
+    }
+
     protected void doTestRemove(final int COUNT) throws IOException {
         Map<String, ByteSequence> map = new LinkedHashMap<String, ByteSequence>();
         for (int i = 0; i < COUNT; i++) {
@@ -108,10 +125,10 @@ public class PListTest {
             plist.addLast(test, bs);
         }
         assertEquals(plist.size(), COUNT);
-        PListEntry entry = plist.getFirst();
+        PListEntry entry = getFirst(plist);
         while (entry != null) {
-            plist.remove(entry.getId());
-            entry = plist.getFirst();
+            plist.remove(entry.getLocator());
+            entry = getFirst(plist);
         }
         assertEquals(0,plist.size());
 
@@ -140,12 +157,12 @@ public class PListTest {
 
     @Test
     public void testRemoveSecond() throws Exception {
-        plist.addLast("First", new ByteSequence("A".getBytes()));
-        plist.addLast("Second", new ByteSequence("B".getBytes()));
+        Object first = plist.addLast("First", new ByteSequence("A".getBytes()));
+        Object second = plist.addLast("Second", new ByteSequence("B".getBytes()));
 
-        assertTrue(plist.remove("Second"));
-        assertTrue(plist.remove("First"));
-        assertFalse(plist.remove("doesNotExist"));
+        assertTrue(plist.remove(second));
+        assertTrue(plist.remove(first));
+        assertFalse(plist.remove(first));
     }
 
     @Test
@@ -161,12 +178,12 @@ public class PListTest {
 
     @Test
     public void testRemoveSecondPosition() throws Exception {
-        plist.addLast("First", new ByteSequence("A".getBytes()));
-        plist.addLast("Second", new ByteSequence("B".getBytes()));
+        Object first = plist.addLast("First", new ByteSequence("A".getBytes()));
+        Object second = plist.addLast("Second", new ByteSequence("B".getBytes()));
 
-        assertTrue(plist.remove(1));
-        assertTrue(plist.remove(0));
-        assertFalse(plist.remove(0));
+        assertTrue(plist.remove(second));
+        assertTrue(plist.remove(first));
+        assertFalse(plist.remove(first));
     }
 
     @Test
@@ -175,11 +192,8 @@ public class PListTest {
         store.stop();
         IOHelper.mkdirs(directory);
         IOHelper.deleteChildren(directory);
-        store = new PListStoreImpl();
-        store.setCleanupInterval(400);
+        store = createConcurrentAddRemovePListStore();
         store.setDirectory(directory);
-        store.setJournalMaxFileLength(1024*5);
-        store.setLazyInit(false);
         store.start();
 
         final ByteSequence payload = new ByteSequence(new byte[1024*2]);
@@ -207,9 +221,9 @@ public class PListTest {
                         PList candidate = lists[i%numLists];
                         Thread.currentThread().setName("ALRF:"+candidate.getName());
                         synchronized (plistLocks(candidate)) {
-                            candidate.addLast(String.valueOf(i), payload);
-                            candidate.getFirst();
-                            assertTrue(candidate.remove(String.valueOf(i)));
+                            Object last = candidate.addLast(String.valueOf(i), payload);
+                            getFirst(candidate);
+                            assertTrue(candidate.remove(last));
                         }
                     }
                 } catch (Exception error) {
@@ -231,9 +245,9 @@ public class PListTest {
                         PList candidate = lists[i%numLists];
                         Thread.currentThread().setName("ALRF:"+candidate.getName());
                          synchronized (plistLocks(candidate)) {
-                            candidate.addLast(String.valueOf(i), payload);
-                            candidate.getFirst();
-                            assertTrue(candidate.remove(String.valueOf(i)));
+                             Object last = candidate.addLast(String.valueOf(i), payload);
+                             getFirst(candidate);
+                            assertTrue(candidate.remove(last));
                          }
                     }
                 } catch (Exception error) {
@@ -259,13 +273,15 @@ public class PListTest {
         assertTrue("finished ok", finishedInTime);
     }
 
+    protected abstract PListStore createConcurrentAddRemovePListStore();
+
     @Test
     public void testConcurrentAddLast() throws Exception {
         File directory = store.getDirectory();
         store.stop();
         IOHelper.mkdirs(directory);
         IOHelper.deleteChildren(directory);
-        store = new PListStoreImpl();
+        store = createPListStore();
         store.setDirectory(directory);
         store.start();
 
@@ -274,15 +290,15 @@ public class PListTest {
         final int iterations = 1000;
         executor = Executors.newFixedThreadPool(100);
         for (int i=0; i<numThreads; i++) {
-            new Job(i, PListTest.TaskType.ADD, iterations).run();
+            new Job(i, PListTestSupport.TaskType.ADD, iterations).run();
         }
 
         for (int i=0; i<numThreads; i++) {
-            executor.execute(new Job(i, PListTest.TaskType.ITERATE, iterations));
+            executor.execute(new Job(i, PListTestSupport.TaskType.ITERATE, iterations));
         }
 
         for (int i=0; i<100; i++) {
-            executor.execute(new Job(i+20, PListTest.TaskType.ADD, 100));
+            executor.execute(new Job(i+20, PListTestSupport.TaskType.ADD, 100));
         }
 
         executor.shutdown();
@@ -296,16 +312,16 @@ public class PListTest {
         store.stop();
         IOHelper.mkdirs(directory);
         IOHelper.deleteChildren(directory);
-        store = new PListStoreImpl();
+        store = createPListStore();
         store.setDirectory(directory);
         store.start();
 
         for (int i=0;i<2000; i++) {
-            new Job(i, PListTest.TaskType.ADD, 5).run();
+            new Job(i, PListTestSupport.TaskType.ADD, 5).run();
 
         }
-        LOG.info("After Load index file: " + store.pageFile.getFile().length());
-        LOG.info("After remove index file: " + store.pageFile.getFile().length());
+//        LOG.info("After Load index file: " + store.pageFile.getFile().length());
+//        LOG.info("After remove index file: " + store.pageFile.getFile().length());
     }
 
     @Test
@@ -314,11 +330,8 @@ public class PListTest {
         store.stop();
         IOHelper.mkdirs(directory);
         IOHelper.deleteChildren(directory);
-        store = new PListStoreImpl();
+        store = createConcurrentAddRemoveWithPreloadPListStore();
         store.setDirectory(directory);
-        store.setJournalMaxFileLength(1024*5);
-        store.setCleanupInterval(5000);
-        store.setIndexWriteBatchSize(500);
         store.start();
 
         final int iterations = 500;
@@ -329,21 +342,21 @@ public class PListTest {
         // create/delete
         LOG.info("create");
         for (int i=0; i<numLists;i++) {
-            new Job(i, PListTest.TaskType.CREATE, iterations).run();
+            new Job(i, PListTestSupport.TaskType.CREATE, iterations).run();
         }
 
         LOG.info("delete");
         for (int i=0; i<numLists;i++) {
-            new Job(i, PListTest.TaskType.DELETE, iterations).run();
+            new Job(i, PListTestSupport.TaskType.DELETE, iterations).run();
         }
 
         LOG.info("fill");
         for (int i=0; i<numLists;i++) {
-            new Job(i, PListTest.TaskType.ADD, iterations).run();
+            new Job(i, PListTestSupport.TaskType.ADD, iterations).run();
         }
         LOG.info("remove");
         for (int i=0; i<numLists;i++) {
-            new Job(i, PListTest.TaskType.REMOVE, iterations).run();
+            new Job(i, PListTestSupport.TaskType.REMOVE, iterations).run();
         }
 
         LOG.info("check empty");
@@ -353,18 +366,18 @@ public class PListTest {
 
         LOG.info("delete again");
         for (int i=0; i<numLists;i++) {
-            new Job(i, PListTest.TaskType.DELETE, iterations).run();
+            new Job(i, PListTestSupport.TaskType.DELETE, iterations).run();
         }
 
         LOG.info("fill again");
         for (int i=0; i<numLists;i++) {
-            new Job(i, PListTest.TaskType.ADD, iterations).run();
+            new Job(i, PListTestSupport.TaskType.ADD, iterations).run();
         }
 
         LOG.info("parallel add and remove");
         executor = Executors.newFixedThreadPool(numLists*2);
         for (int i=0; i<numLists*2; i++) {
-            executor.execute(new Job(i, i>=numLists ? PListTest.TaskType.ADD : PListTest.TaskType.REMOVE, iterations));
+            executor.execute(new Job(i, i>=numLists ? PListTestSupport.TaskType.ADD : PListTestSupport.TaskType.REMOVE, iterations));
         }
 
         executor.shutdown();
@@ -374,6 +387,8 @@ public class PListTest {
         assertTrue("finished ok", finishedInTime);
     }
 
+    protected abstract PListStore createConcurrentAddRemoveWithPreloadPListStore();
+
     // for non determinant issues, increasing this may help diagnose
     final int numRepeats = 1;
 
@@ -396,9 +411,7 @@ public class PListTest {
         store.stop();
         IOHelper.mkdirs(directory);
         IOHelper.deleteChildren(directory);
-        store = new PListStoreImpl();
-        store.setIndexEnablePageCaching(enablePageCache);
-        store.setIndexPageSize(2*1024);
+        store = createConcurrentAddIterateRemovePListStore(enablePageCache);
         store.setDirectory(directory);
         store.start();
 
@@ -407,12 +420,12 @@ public class PListTest {
 
         LOG.info("create");
         for (int i=0; i<numLists;i++) {
-            new Job(i, PListTest.TaskType.CREATE, iterations).run();
+            new Job(i, PListTestSupport.TaskType.CREATE, iterations).run();
         }
 
         LOG.info("fill");
         for (int i=0; i<numLists;i++) {
-            new Job(i, PListTest.TaskType.ADD, iterations).run();
+            new Job(i, PListTestSupport.TaskType.ADD, iterations).run();
         }
 
         LOG.info("parallel add and remove");
@@ -421,7 +434,7 @@ public class PListTest {
         final int numConsumer = 10;
         for (int i=0; i<numLists; i++) {
             for (int j=0; j<numProducer; j++) {
-                executor.execute(new Job(i, PListTest.TaskType.ADD, iterations*2));
+                executor.execute(new Job(i, PListTestSupport.TaskType.ADD, iterations*2));
             }
             for (int k=0;k<numConsumer; k++) {
                 executor.execute(new Job(i, TaskType.ITERATE_REMOVE, iterations/4));
@@ -429,7 +442,7 @@ public class PListTest {
         }
 
          for (int i=numLists; i<numLists*10; i++) {
-            executor.execute(new Job(i, PListTest.TaskType.ADD, iterations));
+            executor.execute(new Job(i, PListTestSupport.TaskType.ADD, iterations));
          }
 
         executor.shutdown();
@@ -439,19 +452,17 @@ public class PListTest {
         assertTrue("test did not  timeout ", shutdown);
     }
 
+    protected abstract PListStore createConcurrentAddIterateRemovePListStore(boolean enablePageCache);
+
+    @Ignore("Takes too long.. might have broken it.")
     @Test
     public void testConcurrentAddIterate() throws Exception {
         File directory = store.getDirectory();
         store.stop();
         IOHelper.mkdirs(directory);
         IOHelper.deleteChildren(directory);
-        store = new PListStoreImpl();
-        store.setIndexPageSize(2*1024);
-        store.setJournalMaxFileLength(1024*1024);
+        store = createConcurrentAddIteratePListStore();
         store.setDirectory(directory);
-        store.setCleanupInterval(-1);
-        store.setIndexEnablePageCaching(false);
-        store.setIndexWriteBatchSize(100);
         store.start();
 
         final int iterations = 250;
@@ -459,7 +470,7 @@ public class PListTest {
 
         LOG.info("create");
         for (int i=0; i<numLists;i++) {
-            new Job(i, PListTest.TaskType.CREATE, iterations).run();
+            new Job(i, PListTestSupport.TaskType.CREATE, iterations).run();
         }
 
         LOG.info("parallel add and iterate");
@@ -471,7 +482,7 @@ public class PListTest {
         final int numConsumer = 100;
         for (int i=0; i<numLists; i++) {
             for (int j=0; j<numProducer; j++) {
-                executor.execute(new Job(i, PListTest.TaskType.ADD, iterations));
+                executor.execute(new Job(i, PListTestSupport.TaskType.ADD, iterations));
             }
             for (int k=0;k<numConsumer; k++) {
                 executor.execute(new Job(i, TaskType.ITERATE, iterations*2));
@@ -483,10 +494,13 @@ public class PListTest {
         boolean shutdown = executor.awaitTermination(60*60, TimeUnit.SECONDS);
         assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
         assertTrue("test did not  timeout ", shutdown);
-        LOG.info("Num dataFiles:" + store.getJournal().getFiles().size());
+//        LOG.info("Num dataFiles:" + store.getJournal().getFiles().size());
     }
 
+    abstract protected PListStore createConcurrentAddIteratePListStore();
+
     enum TaskType {CREATE, DELETE, ADD, REMOVE, ITERATE, ITERATE_REMOVE}
+    ConcurrentHashMap<String, Object> entries = new ConcurrentHashMap<String, Object>();
 
     class Job implements Runnable {
 
@@ -504,7 +518,7 @@ public class PListTest {
         public void run() {
             final String threadName = Thread.currentThread().getName();
             try {
-                PListImpl plist = null;
+                PList plist = null;
                 switch (task) {
                     case CREATE:
                         Thread.currentThread().setName("C:"+id);
@@ -522,7 +536,8 @@ public class PListTest {
                         for (int j = 0; j < iterations; j++) {
                             synchronized (plistLocks(plist)) {
                                 if (exceptions.isEmpty()) {
-                                    plist.addLast ("PL>"  + id + idSeed + "-" + j, payload);
+                                    String key = "PL>" + id + idSeed + "-" + j;
+                                    entries.put(key, plist.addLast(key, payload));
                                 } else {
                                     break;
                                 }
@@ -539,7 +554,11 @@ public class PListTest {
                         synchronized (plistLocks(plist)) {
 
                             for (int j = iterations -1; j >= 0; j--) {
-                                plist.remove("PL>"  + id + idSeed + "-" + j);
+                                String key = "PL>" + id + idSeed + "-" + j;
+                                Object position = entries.remove(key);
+                                if( position!=null ) {
+                                    plist.remove(position);
+                                }
                                 if (j > 0 && j % (iterations / 2) == 0) {
                                     LOG.info("Job-" + id + " Done remove: " + j);
                                 }
@@ -627,12 +646,14 @@ public class PListTest {
     }
 
     protected void startStore(File directory) throws Exception {
-        store = new PListStoreImpl();
+        store = createPListStore();
         store.setDirectory(directory);
         store.start();
         plist = store.getPList("main");
     }
 
+    abstract protected PListStore createPListStore();
+
     @After
     public void tearDown() throws Exception {
         if (executor != null) {

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java?rev=1418686&r1=1418685&r2=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java Sat Dec  8 15:21:14 2012
@@ -35,6 +35,7 @@ public class MessageId implements DataSt
 
     private transient AtomicReference<Object> dataLocator = new AtomicReference<Object>();
     private transient Object entryLocator;
+    private transient Object plistLocator;
 
     public MessageId() {
         this.producerId = new ProducerId();
@@ -153,6 +154,8 @@ public class MessageId implements DataSt
         copy.key = key;
         copy.brokerSequenceId = brokerSequenceId;
         copy.dataLocator = new AtomicReference<Object>(dataLocator != null ? dataLocator.get() : null);
+        copy.entryLocator = entryLocator;
+        copy.plistLocator = plistLocator;
         return copy;
     }
 
@@ -192,4 +195,12 @@ public class MessageId implements DataSt
     public void setEntryLocator(Object entryLocator) {
         this.entryLocator = entryLocator;
     }
+
+    public Object getPlistLocator() {
+        return plistLocator;
+    }
+
+    public void setPlistLocator(Object plistLocator) {
+        this.plistLocator = plistLocator;
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java?rev=1418686&r1=1418685&r2=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java Sat Dec  8 15:21:14 2012
@@ -23,10 +23,7 @@ import org.apache.activemq.broker.Connec
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicSubscription;
 import org.apache.activemq.broker.region.policy.PriorityNetworkDispatchPolicy;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.*;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.derby.iapi.jdbc.BrokeredStatement;
 import org.junit.After;
@@ -52,6 +49,7 @@ public class PriorityNetworkDispatchPoli
         info.setConsumerId(id);
         info.setNetworkSubscription(true);
         info.setNetworkConsumerPath(new ConsumerId[]{id});
+        node.setMessageId(new MessageId("test:1:1:1:1"));
     }
 
     @After

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java?rev=1418686&r1=1418685&r2=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java Sat Dec  8 15:21:14 2012
@@ -46,6 +46,7 @@ public class FilePendingMessageCursorTes
 
     private void createBrokerWithTempStoreLimit() throws Exception {
         brokerService = new BrokerService();
+        brokerService.setUseJmx(false);
         SystemUsage usage = brokerService.getSystemUsage();
         usage.getTempUsage().setLimit(1025*1024*15);
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java?rev=1418686&r1=1418685&r2=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java Sat Dec  8 15:21:14 2012
@@ -22,6 +22,7 @@ public class KahaDBFilePendingMessageCur
     @Test
     public void testAddRemoveAddIndexSize() throws Exception {
         brokerService = new BrokerService();
+        brokerService.setUseJmx(false);
         SystemUsage usage = brokerService.getSystemUsage();
         usage.getMemoryUsage().setLimit(1024*150);
         String body = new String(new byte[1024]);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java?rev=1418686&r1=1418685&r2=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java Sat Dec  8 15:21:14 2012
@@ -50,6 +50,19 @@ public class PListTest {
     final Vector<Throwable> exceptions = new Vector<Throwable>();
     ExecutorService executor;
 
+    private PListEntry getFirst(PList plist) throws IOException {
+        PList.PListIterator iterator = plist.iterator();
+        try {
+            if( iterator.hasNext() ) {
+                return iterator.next();
+            } else {
+                return null;
+            }
+        }finally {
+            iterator.release();
+        }
+    }
+
     @Test
     public void testAddLast() throws Exception {
         final int COUNT = 1000;
@@ -208,7 +221,7 @@ public class PListTest {
                         Thread.currentThread().setName("ALRF:"+candidate.getName());
                         synchronized (plistLocks(candidate)) {
                             candidate.addLast(String.valueOf(i), payload);
-                            candidate.getFirst();
+                            getFirst(candidate);
                             assertTrue(candidate.remove(String.valueOf(i)));
                         }
                     }
@@ -232,7 +245,7 @@ public class PListTest {
                         Thread.currentThread().setName("ALRF:"+candidate.getName());
                          synchronized (plistLocks(candidate)) {
                             candidate.addLast(String.valueOf(i), payload);
-                            candidate.getFirst();
+                            getFirst(candidate);
                             assertTrue(candidate.remove(String.valueOf(i)));
                          }
                     }

Modified: activemq/trunk/activemq-kahadb-store/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/pom.xml?rev=1418686&r1=1418685&r2=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/pom.xml (original)
+++ activemq/trunk/activemq-kahadb-store/pom.xml Sat Dec  8 15:21:14 2012
@@ -124,6 +124,12 @@
     <!-- Testing Dependencies            -->
     <!-- =============================== -->
     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>activemq-broker</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java?rev=1418686&r1=1418685&r2=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java Sat Dec  8 15:21:14 2012
@@ -26,6 +26,8 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.Message;
 import org.apache.activemq.store.PList;
 import org.apache.activemq.store.PListEntry;
 import org.apache.activemq.store.kahadb.disk.index.ListIndex;
@@ -34,6 +36,7 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
 import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
+import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,7 +54,6 @@ public class PListImpl extends ListIndex
         setValueMarshaller(LocationMarshaller.INSTANCE);
     }
 
-    @Override
     public void setName(String name) {
         this.name = name;
     }
@@ -81,8 +83,20 @@ public class PListImpl extends ListIndex
         }
     }
 
+    class Locator {
+        final String id;
+
+        Locator(String id) {
+            this.id = id;
+        }
+
+        PListImpl plist() {
+            return PListImpl.this;
+        }
+    }
+
     @Override
-    public void addLast(final String id, final ByteSequence bs) throws IOException {
+    public Object addLast(final String id, final ByteSequence bs) throws IOException {
         final Location location = this.store.write(bs, false);
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@@ -91,10 +105,11 @@ public class PListImpl extends ListIndex
                 }
             });
         }
+        return new Locator(id);
     }
 
     @Override
-    public void addFirst(final String id, final ByteSequence bs) throws IOException {
+    public Object addFirst(final String id, final ByteSequence bs) throws IOException {
         final Location location = this.store.write(bs, false);
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@@ -103,9 +118,17 @@ public class PListImpl extends ListIndex
                 }
             });
         }
+        return new Locator(id);
     }
 
     @Override
+    public boolean remove(final Object l) throws IOException {
+        Locator locator = (Locator) l;
+        assert locator!=null;
+        assert locator.plist()==this;
+        return remove(locator.id);
+    }
+
     public boolean remove(final String id) throws IOException {
         final AtomicBoolean result = new AtomicBoolean();
         synchronized (indexLock) {
@@ -118,7 +141,6 @@ public class PListImpl extends ListIndex
         return result.get();
     }
 
-    @Override
     public boolean remove(final long position) throws IOException {
         final AtomicBoolean result = new AtomicBoolean();
         synchronized (indexLock) {
@@ -138,7 +160,6 @@ public class PListImpl extends ListIndex
         return result.get();
     }
 
-    @Override
     public PListEntry get(final long position) throws IOException {
         PListEntry result = null;
         final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
@@ -152,12 +173,11 @@ public class PListImpl extends ListIndex
         }
         if (ref.get() != null) {
             ByteSequence bs = this.store.getPayload(ref.get().getValue());
-            result = new PListEntry(ref.get().getKey(), bs);
+            result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey()));
         }
         return result;
     }
 
-    @Override
     public PListEntry getFirst() throws IOException {
         PListEntry result = null;
         final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
@@ -170,12 +190,11 @@ public class PListImpl extends ListIndex
         }
         if (ref.get() != null) {
             ByteSequence bs = this.store.getPayload(ref.get().getValue());
-            result = new PListEntry(ref.get().getKey(), bs);
+            result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey()));
         }
         return result;
     }
 
-    @Override
     public PListEntry getLast() throws IOException {
         PListEntry result = null;
         final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
@@ -188,7 +207,7 @@ public class PListImpl extends ListIndex
         }
         if (ref.get() != null) {
             ByteSequence bs = this.store.getPayload(ref.get().getValue());
-            result = new PListEntry(ref.get().getKey(), bs);
+            result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey()));
         }
         return result;
     }
@@ -230,7 +249,7 @@ public class PListImpl extends ListIndex
                 e.initCause(unexpected);
                 throw e;
             }
-            return new PListEntry(entry.getKey(), bs);
+            return new PListEntry(entry.getKey(), bs, new Locator(entry.getKey()));
         }
 
         @Override

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListStoreImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListStoreImpl.java?rev=1418686&r1=1418685&r2=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListStoreImpl.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListStoreImpl.java Sat Dec  8 15:21:14 2012
@@ -18,6 +18,7 @@ package org.apache.activemq.store.kahadb
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.JournaledStore;
 import org.apache.activemq.store.PList;
 import org.apache.activemq.store.PListStore;
@@ -31,6 +32,7 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.util.*;
+import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/plist/PListImplTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/plist/PListImplTest.java?rev=1418686&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/plist/PListImplTest.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/plist/PListImplTest.java Sat Dec  8 15:21:14 2012
@@ -0,0 +1,52 @@
+package org.apache.activemq.store.kahadb.plist;
+
+import org.apache.activemq.store.PListStore;
+import org.apache.activemq.store.PListTestSupport;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class PListImplTest extends PListTestSupport {
+
+
+    @Override
+    protected PListStoreImpl createPListStore() {
+        return new PListStoreImpl();
+    }
+
+    protected PListStore createConcurrentAddIteratePListStore() {
+        PListStoreImpl store = createPListStore();
+        store.setIndexPageSize(2 * 1024);
+        store.setJournalMaxFileLength(1024 * 1024);
+        store.setCleanupInterval(-1);
+        store.setIndexEnablePageCaching(false);
+        store.setIndexWriteBatchSize(100);
+        return store;
+    }
+
+    @Override
+    protected PListStore createConcurrentAddRemovePListStore() {
+        PListStoreImpl store = createPListStore();
+        store.setCleanupInterval(400);
+        store.setJournalMaxFileLength(1024*5);
+        store.setLazyInit(false);
+        return store;
+    }
+
+    @Override
+    protected PListStore createConcurrentAddRemoveWithPreloadPListStore() {
+        PListStoreImpl store = createPListStore();
+        store.setJournalMaxFileLength(1024*5);
+        store.setCleanupInterval(5000);
+        store.setIndexWriteBatchSize(500);
+        return store;
+    }
+
+    @Override
+    protected PListStore createConcurrentAddIterateRemovePListStore(boolean enablePageCache) {
+        PListStoreImpl store = createPListStore();
+        store.setIndexEnablePageCaching(enablePageCache);
+        store.setIndexPageSize(2*1024);
+        return store;
+    }
+}

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1418686&r1=1418685&r2=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Sat Dec  8 15:21:14 2012
@@ -32,6 +32,15 @@ import util.TimeMetric
 import java.util.HashMap
 import collection.mutable.{HashSet, ListBuffer}
 import org.apache.activemq.util.ByteSequence
+import org.apache.activemq.leveldb.QueueEntryRecord
+import util.TimeMetric
+import org.apache.activemq.leveldb.SubAckRecord
+import scala.Some
+import org.apache.activemq.leveldb.UowManagerConstants.QueueEntryKey
+import org.apache.activemq.leveldb.CountDownFuture
+import org.apache.activemq.leveldb.XaAckRecord
+import org.apache.activemq.leveldb.MessageRecord
+import org.apache.activemq.leveldb.DurableSubscription
 
 case class MessageRecord(id:MessageId, data:Buffer, syncNeeded:Boolean) {
   var locator:(Long, Int) = _
@@ -359,6 +368,7 @@ class DelayableUOW(val manager:DBManager
 class DBManager(val parent:LevelDBStore) {
 
   var lastCollectionKey = new AtomicLong(0)
+  var lastPListKey = new AtomicLong(0)
   val client:LevelDBClient = parent.createClient
 
   def writeExecutor = client.writeExecutor
@@ -658,10 +668,10 @@ class DBManager(val parent:LevelDBStore)
   }
 
   def createQueueStore(dest:ActiveMQQueue):LevelDBStore#LevelDBMessageStore = {
-    parent.createQueueMessageStore(dest, createStore(dest, QUEUE_COLLECTION_TYPE))
+    parent.createQueueMessageStore(dest, createCollection(utf8(dest.getQualifiedName), QUEUE_COLLECTION_TYPE))
   }
   def destroyQueueStore(key:Long) = writeExecutor.sync {
-      client.removeCollection(key)
+    client.removeCollection(key)
   }
 
   def getLogAppendPosition = writeExecutor.sync {
@@ -697,14 +707,14 @@ class DBManager(val parent:LevelDBStore)
   }
 
   def createTopicStore(dest:ActiveMQTopic) = {
-    var key = createStore(dest, TOPIC_COLLECTION_TYPE)
+    var key = createCollection(utf8(dest.getQualifiedName), TOPIC_COLLECTION_TYPE)
     parent.createTopicMessageStore(dest, key)
   }
 
-  def createStore(destination:ActiveMQDestination, collectionType:Int) = {
+  def createCollection(name:Buffer, collectionType:Int) = {
     val collection = new CollectionRecord.Bean()
     collection.setType(collectionType)
-    collection.setMeta(utf8(destination.getQualifiedName))
+    collection.setMeta(name)
     collection.setKey(lastCollectionKey.incrementAndGet())
     val buffer = collection.freeze()
     buffer.toFramedBuffer // eager encode the record.
@@ -714,19 +724,10 @@ class DBManager(val parent:LevelDBStore)
     collection.getKey
   }
 
-  def createTransactionContainer(name:XATransactionId) = {
-    val collection = new CollectionRecord.Bean()
-    collection.setType(TRANSACTION_COLLECTION_TYPE)
-    var packet = parent.wireFormat.marshal(name)
-    collection.setMeta(new Buffer(packet.data, packet.offset, packet.length))
-    collection.setKey(lastCollectionKey.incrementAndGet())
-    val buffer = collection.freeze()
-    buffer.toFramedBuffer // eager encode the record.
-    writeExecutor.sync {
-      client.addCollection(buffer)
-    }
-    collection.getKey
-  }
+  def buffer(packet:ByteSequence) = new Buffer(packet.data, packet.offset, packet.length)
+
+  def createTransactionContainer(id:XATransactionId) =
+    createCollection(buffer(parent.wireFormat.marshal(id)), TRANSACTION_COLLECTION_TYPE)
 
   def removeTransactionContainer(key:Long) = { // writeExecutor.sync {
     client.removeCollection(key)
@@ -773,6 +774,18 @@ class DBManager(val parent:LevelDBStore)
     lastCollectionKey.set(last)
   }
 
+  def createPList(name:String):LevelDBStore#LevelDBPList = {
+    parent.createPList(name, lastPListKey.incrementAndGet())
+  }
+
+  def destroyPList(key:Long) = writeExecutor.sync {
+    client.removePlist(key)
+  }
+
+  def plistPut(key:Array[Byte], value:Array[Byte]) = client.plistPut(key, value)
+  def plistGet(key:Array[Byte]) = client.plistGet(key)
+  def plistDelete(key:Array[Byte]) = client.plistDelete(key)
+  def plistIterator = client.plistIterator
 
   def getMessage(x: MessageId):Message = {
     val id = Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x)

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1418686&r1=1418685&r2=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Sat Dec  8 15:21:14 2012
@@ -27,16 +27,18 @@ import org.iq80.leveldb._
 
 import org.fusesource.hawtdispatch._
 import record.{CollectionKey, EntryKey, EntryRecord, CollectionRecord}
-import util._
+import org.apache.activemq.leveldb.util._
 import java.util.concurrent._
 import org.fusesource.hawtbuf._
 import java.io.{ObjectInputStream, ObjectOutputStream, File}
 import scala.Option._
-import org.apache.activemq.command.{MessageAck, DataStructure, Message}
+import org.apache.activemq.command.{MessageAck, Message}
 import org.apache.activemq.util.ByteSequence
-import org.apache.activemq.leveldb.RecordLog.LogInfo
 import java.text.SimpleDateFormat
 import java.util.{Date, Collections}
+import org.apache.activemq.leveldb.util.TimeMetric
+import org.apache.activemq.leveldb.RecordLog.LogInfo
+import scala.Some
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -58,6 +60,8 @@ object LevelDBClient extends Log {
     override def shutdownNow = Collections.emptyList[Runnable]
   }
 
+  val PLIST_WRITE_OPTIONS = new WriteOptions().sync(false)
+
   final val DIRTY_INDEX_KEY = bytes(":dirty")
   final val LOG_REF_INDEX_KEY = bytes(":log-refs")
   final val COLLECTION_META_KEY = bytes(":collection-meta")
@@ -112,6 +116,18 @@ object LevelDBClient extends Log {
     (in.readVarLong(), in.readVarInt())
   }
 
+  def encodeLongLong(a1:Long, a2:Long) = {
+    val out = new DataByteArrayOutputStream(8)
+    out.writeLong(a1)
+    out.writeLong(a2)
+    out.toBuffer
+  }
+
+  def decodeLongLong(bytes:Array[Byte]):(Long, Long) = {
+    val in = new DataByteArrayInputStream(bytes)
+    (in.readLong(), in.readLong())
+  }
+
   def encodeLong(a1:Long) = {
     val out = new DataByteArrayOutputStream(8)
     out.writeLong(a1)
@@ -404,6 +420,7 @@ class LevelDBClient(store: LevelDBStore)
   var log:RecordLog = _
 
   var index:RichDB = _
+  var plist:RichDB = _
   var indexOptions:Options = _
 
   var lastIndexSnapshotPos:Long = _
@@ -414,6 +431,7 @@ class LevelDBClient(store: LevelDBStore)
   
   val collectionMeta = HashMap[Long, CollectionMeta]()
 
+  def plistIndexFile = directory / ("plist"+INDEX_SUFFIX)
   def dirtyIndexFile = directory / ("dirty"+INDEX_SUFFIX)
   def tempIndexFile = directory / ("temp"+INDEX_SUFFIX)
   def snapshotIndexFile(id:Long) = create_sequence_file(directory,id, INDEX_SUFFIX)
@@ -526,6 +544,11 @@ class LevelDBClient(store: LevelDBStore)
 
     retry {
 
+      // Setup the plist index.
+      plistIndexFile.recursiveDelete
+      plistIndexFile.mkdirs()
+      plist = new RichDB(factory.open(plistIndexFile, indexOptions));
+
       // Delete the dirty indexes
       dirtyIndexFile.recursiveDelete
       dirtyIndexFile.mkdirs()
@@ -1275,4 +1298,23 @@ class LevelDBClient(store: LevelDBStore)
     }
   }
 
+  def removePlist(collectionKey: Long) = {
+    val entryKeyPrefix = encodeLong(collectionKey)
+    collectionMeta.remove(collectionKey)
+    retry {
+      val ro = new ReadOptions
+      ro.fillCache(false)
+      ro.verifyChecksums(false)
+      plist.cursorPrefixed(entryKeyPrefix, ro) { (key, value)=>
+        plist.delete(key)
+        true
+      }
+    }
+  }
+
+  def plistPut(key:Array[Byte], value:Array[Byte]) = plist.put(key, value, PLIST_WRITE_OPTIONS)
+  def plistDelete(key:Array[Byte]) = plist.delete(key, PLIST_WRITE_OPTIONS)
+  def plistGet(key:Array[Byte]) = plist.get(key)
+  def plistIterator = plist.db.iterator()
+
 }

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1418686&r1=1418685&r2=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Sat Dec  8 15:21:14 2012
@@ -23,8 +23,8 @@ import org.apache.activemq.openwire.Open
 import org.apache.activemq.usage.SystemUsage
 import java.io.File
 import java.io.IOException
-import java.util.concurrent.{CountDownLatch, ExecutionException, Future}
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.{ExecutionException, Future}
+import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
 import reflect.BeanProperty
 import org.apache.activemq.store._
 import java.util._
@@ -34,6 +34,9 @@ import javax.management.ObjectName
 import org.apache.activemq.broker.jmx.AnnotatedMBean
 import org.apache.activemq.util._
 import org.apache.activemq.leveldb.util.{RetrySupport, FileSupport, Log}
+import org.apache.activemq.store.PList.PListIterator
+import java.lang
+import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream, Buffer}
 
 object LevelDBStore extends Log {
   val DEFAULT_DIRECTORY = new File("LevelDB");
@@ -111,7 +114,7 @@ class LevelDBStoreView(val store:LevelDB
 
 import LevelDBStore._
 
-class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore {
+class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore with PListStore {
 
   final val wireFormat = new OpenWireFormat
   final val db = new DBManager(this)
@@ -157,6 +160,7 @@ class LevelDBStore extends LockableServi
   val queues = collection.mutable.HashMap[ActiveMQQueue, LevelDBStore#LevelDBMessageStore]()
   val topics = collection.mutable.HashMap[ActiveMQTopic, LevelDBStore#LevelDBTopicMessageStore]()
   val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]()
+  val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]()
 
   def init() = {}
 
@@ -299,7 +303,9 @@ class LevelDBStore extends LockableServi
           if( prepared ) {
             uow.dequeue(xacontainer_id, message.getMessageId)
           }
-          message.setMessageId(message.getMessageId.copy())
+          var copy = message.getMessageId.copy()
+          copy.setEntryLocator(null)
+          message.setMessageId(copy)
           store.doAdd(uow, message, delay)
         }
 
@@ -370,7 +376,6 @@ class LevelDBStore extends LockableServi
     preCommit.run()
     transactions.remove(txid) match {
       case None=>
-        println("The transaction does not exist")
         postCommit.run()
       case Some(tx)=>
         val done = new CountDownLatch(1)
@@ -433,6 +438,31 @@ class LevelDBStore extends LockableServi
     }
   }
 
+
+  def getPList(name: String): PList = {
+    this.synchronized(plists.get(name)).getOrElse(db.createPList(name))
+  }
+
+  def createPList(name: String, key: Long):LevelDBStore#LevelDBPList = {
+    var rc = new LevelDBPList(name, key)
+    this.synchronized {
+      plists.put(name, rc)
+    }
+    rc
+  }
+
+  def removePList(name: String): Boolean = {
+    plists.remove(name) match {
+      case Some(list)=>
+        db.destroyPList(list.key)
+        list.listSize.set(0)
+        true
+      case None =>
+        false
+    }
+  }
+
+
   def createMessageStore(destination: ActiveMQDestination):LevelDBStore#LevelDBMessageStore = {
     destination match {
       case destination:ActiveMQQueue =>
@@ -531,6 +561,7 @@ class LevelDBStore extends LockableServi
 
     override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false)
     override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = {
+      message.getMessageId.setEntryLocator(null)
       if(  message.getTransactionId!=null ) {
         transaction(message.getTransactionId).add(this, message, delay)
         DONE
@@ -765,6 +796,101 @@ class LevelDBStore extends LockableServi
     }
 
   }
+  class LevelDBPList(val name: String, val key: Long) extends PList {
+    import LevelDBClient._
+
+    val lastSeq = new AtomicLong(Long.MaxValue/2)
+    val firstSeq = new AtomicLong(lastSeq.get+1)
+    val listSize = new AtomicLong(0)
+
+    def getName: String = name
+    def destroy() = {
+      removePList(name)
+    }
+
+    def addFirst(id: String, bs: ByteSequence): AnyRef = {
+      var pos = lastSeq.decrementAndGet()
+      add(pos, id, bs)
+      listSize.incrementAndGet()
+      new lang.Long(pos)
+    }
+
+    def addLast(id: String, bs: ByteSequence): AnyRef = {
+      var pos = lastSeq.incrementAndGet()
+      add(pos, id, bs)
+      listSize.incrementAndGet()
+      new lang.Long(pos)
+    }
+
+    def add(pos:Long, id: String, bs: ByteSequence) = {
+      val encoded_key = encodeLongLong(key, pos)
+      val encoded_id = new UTF8Buffer(id)
+      val os = new DataByteArrayOutputStream(2+encoded_id.length+bs.length)
+      os.writeShort(encoded_id.length)
+      os.write(encoded_id.data, encoded_id.offset, encoded_id.length)
+      os.write(bs.getData, bs.getOffset, bs.getLength)
+      db.plistPut(encoded_key, os.toBuffer.toByteArray)
+    }
+
+    def remove(position: AnyRef): Boolean = {
+      val pos = position.asInstanceOf[lang.Long].longValue()
+      val encoded_key = encodeLongLong(key, pos)
+      db.plistGet(encoded_key) match {
+        case Some(value) =>
+          db.plistDelete(encoded_key)
+          listSize.decrementAndGet()
+          true
+        case None =>
+          false
+      }
+    }
+
+    def isEmpty = size()==0
+    def size(): Long = listSize.get()
+
+    def iterator() = new PListIterator() {
+      val prefix = LevelDBClient.encodeLong(key)
+      var dbi = db.plistIterator
+      var last_key:Array[Byte] = _
+
+      dbi.seek(prefix);
+
+
+      def hasNext: Boolean = dbi!=null && dbi.hasNext && dbi.peekNext.getKey.startsWith(prefix)
+      def next() = {
+        if ( dbi==null || !dbi.hasNext ) {
+          throw new NoSuchElementException();
+        }
+        val n = dbi.peekNext();
+        last_key = n.getKey
+        val (k, pos) = decodeLongLong(last_key)
+        if( k!=key ) {
+          throw new NoSuchElementException();
+        }
+        var value = n.getValue
+        val is = new org.fusesource.hawtbuf.DataByteArrayInputStream(value)
+        val id = is.readBuffer(is.readShort()).utf8().toString
+        val data = new ByteSequence(value, is.getPos, value.length-is.getPos)
+        dbi.next()
+        new PListEntry(id, data, pos)
+      }
+
+      def release() = {
+        dbi.close()
+        dbi = null
+      }
+
+      def remove() = {
+        if( last_key==null ) {
+          throw new NoSuchElementException();
+        }
+        db.plistDelete(last_key)
+        listSize.decrementAndGet()
+        last_key = null
+      }
+    }
+
+  }
 
   ///////////////////////////////////////////////////////////////////////////
   // The following methods actually have nothing to do with JMS txs... It's more like

Copied: activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/LevelDBPlistTest.java (from r1418454, activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PList.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/LevelDBPlistTest.java?p2=activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/LevelDBPlistTest.java&p1=activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PList.java&r1=1418454&r2=1418686&rev=1418686&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PList.java (original)
+++ activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/LevelDBPlistTest.java Sat Dec  8 15:21:14 2012
@@ -14,44 +14,37 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.store;
+package org.apache.activemq.leveldb;
 
-import org.apache.activemq.util.ByteSequence;
-
-import java.io.IOException;
-import java.util.Iterator;
+import org.apache.activemq.store.PListTestSupport;
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public interface PList {
-    void setName(String name);
-
-    String getName();
-
-    void destroy() throws IOException;
-
-    void addLast(String id, ByteSequence bs) throws IOException;
-
-    void addFirst(String id, ByteSequence bs) throws IOException;
-
-    boolean remove(String id) throws IOException;
-
-    boolean remove(long position) throws IOException;
+public class LevelDBPlistTest extends PListTestSupport {
 
-    PListEntry get(long position) throws IOException;
-
-    PListEntry getFirst() throws IOException;
-
-    PListEntry getLast() throws IOException;
+    @Override
+    protected LevelDBStore createPListStore() {
+        return new LevelDBStore();
+    }
 
-    boolean isEmpty();
+    protected LevelDBStore createConcurrentAddIteratePListStore() {
+        return new LevelDBStore();
+    }
 
-    PListIterator iterator() throws IOException;
+    @Override
+    protected LevelDBStore createConcurrentAddRemovePListStore() {
+        return new LevelDBStore();
+    }
 
-    long size();
+    @Override
+    protected LevelDBStore createConcurrentAddRemoveWithPreloadPListStore() {
+        return new LevelDBStore();
+    }
 
-    public interface PListIterator extends Iterator<PListEntry> {
-        void release();
+    @Override
+    protected LevelDBStore createConcurrentAddIterateRemovePListStore(boolean enablePageCache) {
+        return new LevelDBStore();
     }
+
 }



Mime
View raw message