Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3CA6ED146 for ; Sat, 8 Dec 2012 15:21:51 +0000 (UTC) Received: (qmail 55658 invoked by uid 500); 8 Dec 2012 15:21:51 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 55459 invoked by uid 500); 8 Dec 2012 15:21:45 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 55428 invoked by uid 99); 8 Dec 2012 15:21:44 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 08 Dec 2012 15:21:44 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 08 Dec 2012 15:21:40 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 5026623888E7; Sat, 8 Dec 2012 15:21:20 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1418686 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/ activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/ activemq-brok... Date: Sat, 08 Dec 2012 15:21:17 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121208152120.5026623888E7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Sat Dec 8 15:21:14 2012 New Revision: 1418686 URL: http://svn.apache.org/viewvc?rev=1418686&view=rev Log: Fixes AMQ-4215: Simplify PList interface and provide a LevelDB store implementation. Added: activemq/trunk/activemq-broker/src/test/java/org/apache/activemq/store/PListTestSupport.java - copied, changed from r1418454, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/plist/ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/plist/PListImplTest.java activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/LevelDBPlistTest.java - copied, changed from r1418454, activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PList.java Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PList.java activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PListEntry.java activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java activemq/trunk/activemq-kahadb-store/pom.xml activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListStoreImpl.java activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1418686&r1=1418685&r2=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java Sat Dec 8 15:21:14 2012 @@ -1530,6 +1530,16 @@ public class BrokerService implements Se if (!isPersistent()) { return null; } + + try { + PersistenceAdapter pa = getPersistenceAdapter(); + if( pa!=null && pa instanceof PListStore) { + return (PListStore) pa; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + boolean result = true; boolean empty = true; try { Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?rev=1418686&r1=1418685&r2=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java Sat Dec 8 15:21:14 2012 @@ -37,12 +37,14 @@ public class IndirectMessageReference im private boolean acked; /** Direct reference to the message */ private final Message message; + private final MessageId messageId; /** * @param message */ public IndirectMessageReference(final Message message) { this.message = message; + this.messageId = message.getMessageId().copy(); message.getMessageId(); message.getGroupID(); message.getGroupSequence(); @@ -111,7 +113,7 @@ public class IndirectMessageReference im } public MessageId getMessageId() { - return message.getMessageId(); + return messageId; } public Message.MessageDestination getRegionDestination() { Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1418686&r1=1418685&r2=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Sat Dec 8 15:21:14 2012 @@ -100,6 +100,9 @@ public class TopicSubscription extends A if (isDuplicate(node)) { return; } + // Lets use an indirect reference so that we can associate a unique + // locator /w the message. + node = new IndirectMessageReference(node.getMessage()); enqueueCounter.incrementAndGet(); if (!isFull() && matched.isEmpty()) { // if maximumPendingMessages is set we will only discard messages which @@ -540,7 +543,7 @@ public class TopicSubscription extends A } private void dispatch(final MessageReference node) throws IOException { - Message message = (Message)node; + Message message = node.getMessage(); if (node != null) { node.incrementReferenceCount(); } Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=1418686&r1=1418685&r2=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Sat Dec 8 15:21:14 2012 @@ -278,7 +278,8 @@ public class FilePendingMessageCursor ex systemUsage.getTempUsage().waitForSpace(); node.decrementReferenceCount(); ByteSequence bs = getByteSequence(node.getMessage()); - getDiskList().addFirst(node.getMessageId().toString(), bs); + Object locator = getDiskList().addFirst(node.getMessageId().toString(), bs); + node.getMessageId().setPlistLocator(locator); } catch (Exception e) { LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e); @@ -335,7 +336,7 @@ public class FilePendingMessageCursor ex } if (!isDiskListEmpty()) { try { - getDiskList().remove(node.getMessageId().toString()); + getDiskList().remove(node.getMessageId().getPlistLocator()); } catch (IOException e) { throw new RuntimeException(e); } @@ -506,7 +507,9 @@ public class FilePendingMessageCursor ex public MessageReference next() { try { PListEntry entry = iterator.next(); - return getMessage(entry.getByteSequence()); + Message message = getMessage(entry.getByteSequence()); + message.getMessageId().setPlistLocator(entry.getLocator()); + return message; } catch (IOException e) { LOG.error("I/O error", e); throw new RuntimeException(e); Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PList.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PList.java?rev=1418686&r1=1418685&r2=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PList.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PList.java Sat Dec 8 15:21:14 2012 @@ -16,7 +16,9 @@ */ package org.apache.activemq.store; +import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.wireformat.WireFormat; import java.io.IOException; import java.util.Iterator; @@ -25,30 +27,17 @@ import java.util.Iterator; * @author Hiram Chirino */ public interface PList { - void setName(String name); - String getName(); void destroy() throws IOException; - void addLast(String id, ByteSequence bs) throws IOException; - - void addFirst(String id, ByteSequence bs) throws IOException; - - boolean remove(String id) throws IOException; - - boolean remove(long position) throws IOException; + Object addFirst(String id, ByteSequence bs) throws IOException; + Object addLast(String id, ByteSequence bs) throws IOException; - PListEntry get(long position) throws IOException; - - PListEntry getFirst() throws IOException; - - PListEntry getLast() throws IOException; + boolean remove(Object position) throws IOException; boolean isEmpty(); - PListIterator iterator() throws IOException; - long size(); public interface PListIterator extends Iterator { Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PListEntry.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PListEntry.java?rev=1418686&r1=1418685&r2=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PListEntry.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PListEntry.java Sat Dec 8 15:21:14 2012 @@ -22,10 +22,12 @@ public class PListEntry { private final ByteSequence byteSequence; private final String entry; + private final Object locator; - public PListEntry(String entry, ByteSequence bs) { + public PListEntry(String entry, ByteSequence bs, Object locator) { this.entry = entry; this.byteSequence = bs; + this.locator = locator; } public ByteSequence getByteSequence() { @@ -36,7 +38,11 @@ public class PListEntry { return this.entry; } + public Object getLocator() { + return locator; + } + public PListEntry copy() { - return new PListEntry(this.entry, this.byteSequence); + return new PListEntry(this.entry, this.byteSequence, locator); } } Copied: activemq/trunk/activemq-broker/src/test/java/org/apache/activemq/store/PListTestSupport.java (from r1418454, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java) URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/test/java/org/apache/activemq/store/PListTestSupport.java?p2=activemq/trunk/activemq-broker/src/test/java/org/apache/activemq/store/PListTestSupport.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java&r1=1418454&r2=1418686&rev=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java (original) +++ activemq/trunk/activemq-broker/src/test/java/org/apache/activemq/store/PListTestSupport.java Sat Dec 8 15:21:14 2012 @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.store.kahadb.plist; +package org.apache.activemq.store; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -22,29 +22,25 @@ import static org.junit.Assert.assertTru import java.io.File; import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Vector; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import org.apache.activemq.store.PList; -import org.apache.activemq.store.PListEntry; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.ByteSequence; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PListTest { - static final Logger LOG = LoggerFactory.getLogger(PListTest.class); - private PListStoreImpl store; - private PListImpl plist; +public abstract class PListTestSupport { + static final Logger LOG = LoggerFactory.getLogger(PListTestSupport.class); + private PListStore store; + private PList plist; final ByteSequence payload = new ByteSequence(new byte[400]); final String idSeed = new String("Seed" + new byte[1024]); final Vector exceptions = new Vector(); @@ -53,52 +49,73 @@ public class PListTest { @Test public void testAddLast() throws Exception { final int COUNT = 1000; - Map map = new LinkedHashMap(); + LinkedList list = new LinkedList(); for (int i = 0; i < COUNT; i++) { String test = new String("test" + i); ByteSequence bs = new ByteSequence(test.getBytes()); - map.put(test, bs); + list.addLast(bs); plist.addLast(test, bs); } assertEquals(plist.size(), COUNT); - int count = 0; - for (ByteSequence bs : map.values()) { + + PList.PListIterator actual = plist.iterator(); + Iterator expected = list.iterator(); + while (expected.hasNext()) { + ByteSequence bs = expected.next(); + assertTrue(actual.hasNext()); + PListEntry entry = actual.next(); String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength()); - PListEntry entry = plist.get(count); String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(), entry.getByteSequence().getLength()); assertEquals(origStr, plistString); - count++; } + assertFalse(actual.hasNext()); } @Test public void testAddFirst() throws Exception { - final int COUNT = 1000; - Map map = new LinkedHashMap(); - for (int i = 0; i < COUNT; i++) { - String test = new String("test" + i); - ByteSequence bs = new ByteSequence(test.getBytes()); - map.put(test, bs); - plist.addFirst(test, bs); - } - assertEquals(plist.size(), COUNT); - long count = plist.size() - 1; - for (ByteSequence bs : map.values()) { - String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength()); - PListEntry entry = plist.get(count); - String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(), - entry.getByteSequence().getLength()); - assertEquals(origStr, plistString); - count--; - } - } + final int COUNT = 1000; + LinkedList list = new LinkedList(); + for (int i = 0; i < COUNT; i++) { + String test = new String("test" + i); + ByteSequence bs = new ByteSequence(test.getBytes()); + list.addFirst(bs); + plist.addFirst(test, bs); + } + assertEquals(plist.size(), COUNT); + + PList.PListIterator actual = plist.iterator(); + Iterator expected = list.iterator(); + while (expected.hasNext()) { + ByteSequence bs = expected.next(); + assertTrue(actual.hasNext()); + PListEntry entry = actual.next(); + String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength()); + String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(), + entry.getByteSequence().getLength()); + assertEquals(origStr, plistString); + } + assertFalse(actual.hasNext()); + } @Test public void testRemove() throws IOException { doTestRemove(2000); } + private PListEntry getFirst(PList plist) throws IOException { + PList.PListIterator iterator = plist.iterator(); + try { + if( iterator.hasNext() ) { + return iterator.next(); + } else { + return null; + } + }finally { + iterator.release(); + } + } + protected void doTestRemove(final int COUNT) throws IOException { Map map = new LinkedHashMap(); for (int i = 0; i < COUNT; i++) { @@ -108,10 +125,10 @@ public class PListTest { plist.addLast(test, bs); } assertEquals(plist.size(), COUNT); - PListEntry entry = plist.getFirst(); + PListEntry entry = getFirst(plist); while (entry != null) { - plist.remove(entry.getId()); - entry = plist.getFirst(); + plist.remove(entry.getLocator()); + entry = getFirst(plist); } assertEquals(0,plist.size()); @@ -140,12 +157,12 @@ public class PListTest { @Test public void testRemoveSecond() throws Exception { - plist.addLast("First", new ByteSequence("A".getBytes())); - plist.addLast("Second", new ByteSequence("B".getBytes())); + Object first = plist.addLast("First", new ByteSequence("A".getBytes())); + Object second = plist.addLast("Second", new ByteSequence("B".getBytes())); - assertTrue(plist.remove("Second")); - assertTrue(plist.remove("First")); - assertFalse(plist.remove("doesNotExist")); + assertTrue(plist.remove(second)); + assertTrue(plist.remove(first)); + assertFalse(plist.remove(first)); } @Test @@ -161,12 +178,12 @@ public class PListTest { @Test public void testRemoveSecondPosition() throws Exception { - plist.addLast("First", new ByteSequence("A".getBytes())); - plist.addLast("Second", new ByteSequence("B".getBytes())); + Object first = plist.addLast("First", new ByteSequence("A".getBytes())); + Object second = plist.addLast("Second", new ByteSequence("B".getBytes())); - assertTrue(plist.remove(1)); - assertTrue(plist.remove(0)); - assertFalse(plist.remove(0)); + assertTrue(plist.remove(second)); + assertTrue(plist.remove(first)); + assertFalse(plist.remove(first)); } @Test @@ -175,11 +192,8 @@ public class PListTest { store.stop(); IOHelper.mkdirs(directory); IOHelper.deleteChildren(directory); - store = new PListStoreImpl(); - store.setCleanupInterval(400); + store = createConcurrentAddRemovePListStore(); store.setDirectory(directory); - store.setJournalMaxFileLength(1024*5); - store.setLazyInit(false); store.start(); final ByteSequence payload = new ByteSequence(new byte[1024*2]); @@ -207,9 +221,9 @@ public class PListTest { PList candidate = lists[i%numLists]; Thread.currentThread().setName("ALRF:"+candidate.getName()); synchronized (plistLocks(candidate)) { - candidate.addLast(String.valueOf(i), payload); - candidate.getFirst(); - assertTrue(candidate.remove(String.valueOf(i))); + Object last = candidate.addLast(String.valueOf(i), payload); + getFirst(candidate); + assertTrue(candidate.remove(last)); } } } catch (Exception error) { @@ -231,9 +245,9 @@ public class PListTest { PList candidate = lists[i%numLists]; Thread.currentThread().setName("ALRF:"+candidate.getName()); synchronized (plistLocks(candidate)) { - candidate.addLast(String.valueOf(i), payload); - candidate.getFirst(); - assertTrue(candidate.remove(String.valueOf(i))); + Object last = candidate.addLast(String.valueOf(i), payload); + getFirst(candidate); + assertTrue(candidate.remove(last)); } } } catch (Exception error) { @@ -259,13 +273,15 @@ public class PListTest { assertTrue("finished ok", finishedInTime); } + protected abstract PListStore createConcurrentAddRemovePListStore(); + @Test public void testConcurrentAddLast() throws Exception { File directory = store.getDirectory(); store.stop(); IOHelper.mkdirs(directory); IOHelper.deleteChildren(directory); - store = new PListStoreImpl(); + store = createPListStore(); store.setDirectory(directory); store.start(); @@ -274,15 +290,15 @@ public class PListTest { final int iterations = 1000; executor = Executors.newFixedThreadPool(100); for (int i=0; i=numLists ? PListTest.TaskType.ADD : PListTest.TaskType.REMOVE, iterations)); + executor.execute(new Job(i, i>=numLists ? PListTestSupport.TaskType.ADD : PListTestSupport.TaskType.REMOVE, iterations)); } executor.shutdown(); @@ -374,6 +387,8 @@ public class PListTest { assertTrue("finished ok", finishedInTime); } + protected abstract PListStore createConcurrentAddRemoveWithPreloadPListStore(); + // for non determinant issues, increasing this may help diagnose final int numRepeats = 1; @@ -396,9 +411,7 @@ public class PListTest { store.stop(); IOHelper.mkdirs(directory); IOHelper.deleteChildren(directory); - store = new PListStoreImpl(); - store.setIndexEnablePageCaching(enablePageCache); - store.setIndexPageSize(2*1024); + store = createConcurrentAddIterateRemovePListStore(enablePageCache); store.setDirectory(directory); store.start(); @@ -407,12 +420,12 @@ public class PListTest { LOG.info("create"); for (int i=0; i entries = new ConcurrentHashMap(); class Job implements Runnable { @@ -504,7 +518,7 @@ public class PListTest { public void run() { final String threadName = Thread.currentThread().getName(); try { - PListImpl plist = null; + PList plist = null; switch (task) { case CREATE: Thread.currentThread().setName("C:"+id); @@ -522,7 +536,8 @@ public class PListTest { for (int j = 0; j < iterations; j++) { synchronized (plistLocks(plist)) { if (exceptions.isEmpty()) { - plist.addLast ("PL>" + id + idSeed + "-" + j, payload); + String key = "PL>" + id + idSeed + "-" + j; + entries.put(key, plist.addLast(key, payload)); } else { break; } @@ -539,7 +554,11 @@ public class PListTest { synchronized (plistLocks(plist)) { for (int j = iterations -1; j >= 0; j--) { - plist.remove("PL>" + id + idSeed + "-" + j); + String key = "PL>" + id + idSeed + "-" + j; + Object position = entries.remove(key); + if( position!=null ) { + plist.remove(position); + } if (j > 0 && j % (iterations / 2) == 0) { LOG.info("Job-" + id + " Done remove: " + j); } @@ -627,12 +646,14 @@ public class PListTest { } protected void startStore(File directory) throws Exception { - store = new PListStoreImpl(); + store = createPListStore(); store.setDirectory(directory); store.start(); plist = store.getPList("main"); } + abstract protected PListStore createPListStore(); + @After public void tearDown() throws Exception { if (executor != null) { Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java?rev=1418686&r1=1418685&r2=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java (original) +++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java Sat Dec 8 15:21:14 2012 @@ -35,6 +35,7 @@ public class MessageId implements DataSt private transient AtomicReference dataLocator = new AtomicReference(); private transient Object entryLocator; + private transient Object plistLocator; public MessageId() { this.producerId = new ProducerId(); @@ -153,6 +154,8 @@ public class MessageId implements DataSt copy.key = key; copy.brokerSequenceId = brokerSequenceId; copy.dataLocator = new AtomicReference(dataLocator != null ? dataLocator.get() : null); + copy.entryLocator = entryLocator; + copy.plistLocator = plistLocator; return copy; } @@ -192,4 +195,12 @@ public class MessageId implements DataSt public void setEntryLocator(Object entryLocator) { this.entryLocator = entryLocator; } + + public Object getPlistLocator() { + return plistLocator; + } + + public void setPlistLocator(Object plistLocator) { + this.plistLocator = plistLocator; + } } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java?rev=1418686&r1=1418685&r2=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java Sat Dec 8 15:21:14 2012 @@ -23,10 +23,7 @@ import org.apache.activemq.broker.Connec import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.broker.region.policy.PriorityNetworkDispatchPolicy; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.*; import org.apache.activemq.usage.SystemUsage; import org.apache.derby.iapi.jdbc.BrokeredStatement; import org.junit.After; @@ -52,6 +49,7 @@ public class PriorityNetworkDispatchPoli info.setConsumerId(id); info.setNetworkSubscription(true); info.setNetworkConsumerPath(new ConsumerId[]{id}); + node.setMessageId(new MessageId("test:1:1:1:1")); } @After Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java?rev=1418686&r1=1418685&r2=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java Sat Dec 8 15:21:14 2012 @@ -46,6 +46,7 @@ public class FilePendingMessageCursorTes private void createBrokerWithTempStoreLimit() throws Exception { brokerService = new BrokerService(); + brokerService.setUseJmx(false); SystemUsage usage = brokerService.getSystemUsage(); usage.getTempUsage().setLimit(1025*1024*15); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java?rev=1418686&r1=1418685&r2=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java Sat Dec 8 15:21:14 2012 @@ -22,6 +22,7 @@ public class KahaDBFilePendingMessageCur @Test public void testAddRemoveAddIndexSize() throws Exception { brokerService = new BrokerService(); + brokerService.setUseJmx(false); SystemUsage usage = brokerService.getSystemUsage(); usage.getMemoryUsage().setLimit(1024*150); String body = new String(new byte[1024]); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java?rev=1418686&r1=1418685&r2=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java Sat Dec 8 15:21:14 2012 @@ -50,6 +50,19 @@ public class PListTest { final Vector exceptions = new Vector(); ExecutorService executor; + private PListEntry getFirst(PList plist) throws IOException { + PList.PListIterator iterator = plist.iterator(); + try { + if( iterator.hasNext() ) { + return iterator.next(); + } else { + return null; + } + }finally { + iterator.release(); + } + } + @Test public void testAddLast() throws Exception { final int COUNT = 1000; @@ -208,7 +221,7 @@ public class PListTest { Thread.currentThread().setName("ALRF:"+candidate.getName()); synchronized (plistLocks(candidate)) { candidate.addLast(String.valueOf(i), payload); - candidate.getFirst(); + getFirst(candidate); assertTrue(candidate.remove(String.valueOf(i))); } } @@ -232,7 +245,7 @@ public class PListTest { Thread.currentThread().setName("ALRF:"+candidate.getName()); synchronized (plistLocks(candidate)) { candidate.addLast(String.valueOf(i), payload); - candidate.getFirst(); + getFirst(candidate); assertTrue(candidate.remove(String.valueOf(i))); } } Modified: activemq/trunk/activemq-kahadb-store/pom.xml URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/pom.xml?rev=1418686&r1=1418685&r2=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-kahadb-store/pom.xml (original) +++ activemq/trunk/activemq-kahadb-store/pom.xml Sat Dec 8 15:21:14 2012 @@ -124,6 +124,12 @@ + ${project.groupId} + activemq-broker + test-jar + test + + junit junit test Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java?rev=1418686&r1=1418685&r2=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java (original) +++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java Sat Dec 8 15:21:14 2012 @@ -26,6 +26,8 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.Message; import org.apache.activemq.store.PList; import org.apache.activemq.store.PListEntry; import org.apache.activemq.store.kahadb.disk.index.ListIndex; @@ -34,6 +36,7 @@ import org.apache.activemq.store.kahadb. import org.apache.activemq.util.ByteSequence; import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; +import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +54,6 @@ public class PListImpl extends ListIndex setValueMarshaller(LocationMarshaller.INSTANCE); } - @Override public void setName(String name) { this.name = name; } @@ -81,8 +83,20 @@ public class PListImpl extends ListIndex } } + class Locator { + final String id; + + Locator(String id) { + this.id = id; + } + + PListImpl plist() { + return PListImpl.this; + } + } + @Override - public void addLast(final String id, final ByteSequence bs) throws IOException { + public Object addLast(final String id, final ByteSequence bs) throws IOException { final Location location = this.store.write(bs, false); synchronized (indexLock) { this.store.getPageFile().tx().execute(new Transaction.Closure() { @@ -91,10 +105,11 @@ public class PListImpl extends ListIndex } }); } + return new Locator(id); } @Override - public void addFirst(final String id, final ByteSequence bs) throws IOException { + public Object addFirst(final String id, final ByteSequence bs) throws IOException { final Location location = this.store.write(bs, false); synchronized (indexLock) { this.store.getPageFile().tx().execute(new Transaction.Closure() { @@ -103,9 +118,17 @@ public class PListImpl extends ListIndex } }); } + return new Locator(id); } @Override + public boolean remove(final Object l) throws IOException { + Locator locator = (Locator) l; + assert locator!=null; + assert locator.plist()==this; + return remove(locator.id); + } + public boolean remove(final String id) throws IOException { final AtomicBoolean result = new AtomicBoolean(); synchronized (indexLock) { @@ -118,7 +141,6 @@ public class PListImpl extends ListIndex return result.get(); } - @Override public boolean remove(final long position) throws IOException { final AtomicBoolean result = new AtomicBoolean(); synchronized (indexLock) { @@ -138,7 +160,6 @@ public class PListImpl extends ListIndex return result.get(); } - @Override public PListEntry get(final long position) throws IOException { PListEntry result = null; final AtomicReference> ref = new AtomicReference>(); @@ -152,12 +173,11 @@ public class PListImpl extends ListIndex } if (ref.get() != null) { ByteSequence bs = this.store.getPayload(ref.get().getValue()); - result = new PListEntry(ref.get().getKey(), bs); + result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey())); } return result; } - @Override public PListEntry getFirst() throws IOException { PListEntry result = null; final AtomicReference> ref = new AtomicReference>(); @@ -170,12 +190,11 @@ public class PListImpl extends ListIndex } if (ref.get() != null) { ByteSequence bs = this.store.getPayload(ref.get().getValue()); - result = new PListEntry(ref.get().getKey(), bs); + result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey())); } return result; } - @Override public PListEntry getLast() throws IOException { PListEntry result = null; final AtomicReference> ref = new AtomicReference>(); @@ -188,7 +207,7 @@ public class PListImpl extends ListIndex } if (ref.get() != null) { ByteSequence bs = this.store.getPayload(ref.get().getValue()); - result = new PListEntry(ref.get().getKey(), bs); + result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey())); } return result; } @@ -230,7 +249,7 @@ public class PListImpl extends ListIndex e.initCause(unexpected); throw e; } - return new PListEntry(entry.getKey(), bs); + return new PListEntry(entry.getKey(), bs, new Locator(entry.getKey())); } @Override Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListStoreImpl.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListStoreImpl.java?rev=1418686&r1=1418685&r2=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListStoreImpl.java (original) +++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListStoreImpl.java Sat Dec 8 15:21:14 2012 @@ -18,6 +18,7 @@ package org.apache.activemq.store.kahadb import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.store.JournaledStore; import org.apache.activemq.store.PList; import org.apache.activemq.store.PListStore; @@ -31,6 +32,7 @@ import org.apache.activemq.store.kahadb. import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.util.*; +import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/plist/PListImplTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/plist/PListImplTest.java?rev=1418686&view=auto ============================================================================== --- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/plist/PListImplTest.java (added) +++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/plist/PListImplTest.java Sat Dec 8 15:21:14 2012 @@ -0,0 +1,52 @@ +package org.apache.activemq.store.kahadb.plist; + +import org.apache.activemq.store.PListStore; +import org.apache.activemq.store.PListTestSupport; + +/** + * @author Hiram Chirino + */ +public class PListImplTest extends PListTestSupport { + + + @Override + protected PListStoreImpl createPListStore() { + return new PListStoreImpl(); + } + + protected PListStore createConcurrentAddIteratePListStore() { + PListStoreImpl store = createPListStore(); + store.setIndexPageSize(2 * 1024); + store.setJournalMaxFileLength(1024 * 1024); + store.setCleanupInterval(-1); + store.setIndexEnablePageCaching(false); + store.setIndexWriteBatchSize(100); + return store; + } + + @Override + protected PListStore createConcurrentAddRemovePListStore() { + PListStoreImpl store = createPListStore(); + store.setCleanupInterval(400); + store.setJournalMaxFileLength(1024*5); + store.setLazyInit(false); + return store; + } + + @Override + protected PListStore createConcurrentAddRemoveWithPreloadPListStore() { + PListStoreImpl store = createPListStore(); + store.setJournalMaxFileLength(1024*5); + store.setCleanupInterval(5000); + store.setIndexWriteBatchSize(500); + return store; + } + + @Override + protected PListStore createConcurrentAddIterateRemovePListStore(boolean enablePageCache) { + PListStoreImpl store = createPListStore(); + store.setIndexEnablePageCaching(enablePageCache); + store.setIndexPageSize(2*1024); + return store; + } +} Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1418686&r1=1418685&r2=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original) +++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Sat Dec 8 15:21:14 2012 @@ -32,6 +32,15 @@ import util.TimeMetric import java.util.HashMap import collection.mutable.{HashSet, ListBuffer} import org.apache.activemq.util.ByteSequence +import org.apache.activemq.leveldb.QueueEntryRecord +import util.TimeMetric +import org.apache.activemq.leveldb.SubAckRecord +import scala.Some +import org.apache.activemq.leveldb.UowManagerConstants.QueueEntryKey +import org.apache.activemq.leveldb.CountDownFuture +import org.apache.activemq.leveldb.XaAckRecord +import org.apache.activemq.leveldb.MessageRecord +import org.apache.activemq.leveldb.DurableSubscription case class MessageRecord(id:MessageId, data:Buffer, syncNeeded:Boolean) { var locator:(Long, Int) = _ @@ -359,6 +368,7 @@ class DelayableUOW(val manager:DBManager class DBManager(val parent:LevelDBStore) { var lastCollectionKey = new AtomicLong(0) + var lastPListKey = new AtomicLong(0) val client:LevelDBClient = parent.createClient def writeExecutor = client.writeExecutor @@ -658,10 +668,10 @@ class DBManager(val parent:LevelDBStore) } def createQueueStore(dest:ActiveMQQueue):LevelDBStore#LevelDBMessageStore = { - parent.createQueueMessageStore(dest, createStore(dest, QUEUE_COLLECTION_TYPE)) + parent.createQueueMessageStore(dest, createCollection(utf8(dest.getQualifiedName), QUEUE_COLLECTION_TYPE)) } def destroyQueueStore(key:Long) = writeExecutor.sync { - client.removeCollection(key) + client.removeCollection(key) } def getLogAppendPosition = writeExecutor.sync { @@ -697,14 +707,14 @@ class DBManager(val parent:LevelDBStore) } def createTopicStore(dest:ActiveMQTopic) = { - var key = createStore(dest, TOPIC_COLLECTION_TYPE) + var key = createCollection(utf8(dest.getQualifiedName), TOPIC_COLLECTION_TYPE) parent.createTopicMessageStore(dest, key) } - def createStore(destination:ActiveMQDestination, collectionType:Int) = { + def createCollection(name:Buffer, collectionType:Int) = { val collection = new CollectionRecord.Bean() collection.setType(collectionType) - collection.setMeta(utf8(destination.getQualifiedName)) + collection.setMeta(name) collection.setKey(lastCollectionKey.incrementAndGet()) val buffer = collection.freeze() buffer.toFramedBuffer // eager encode the record. @@ -714,19 +724,10 @@ class DBManager(val parent:LevelDBStore) collection.getKey } - def createTransactionContainer(name:XATransactionId) = { - val collection = new CollectionRecord.Bean() - collection.setType(TRANSACTION_COLLECTION_TYPE) - var packet = parent.wireFormat.marshal(name) - collection.setMeta(new Buffer(packet.data, packet.offset, packet.length)) - collection.setKey(lastCollectionKey.incrementAndGet()) - val buffer = collection.freeze() - buffer.toFramedBuffer // eager encode the record. - writeExecutor.sync { - client.addCollection(buffer) - } - collection.getKey - } + def buffer(packet:ByteSequence) = new Buffer(packet.data, packet.offset, packet.length) + + def createTransactionContainer(id:XATransactionId) = + createCollection(buffer(parent.wireFormat.marshal(id)), TRANSACTION_COLLECTION_TYPE) def removeTransactionContainer(key:Long) = { // writeExecutor.sync { client.removeCollection(key) @@ -773,6 +774,18 @@ class DBManager(val parent:LevelDBStore) lastCollectionKey.set(last) } + def createPList(name:String):LevelDBStore#LevelDBPList = { + parent.createPList(name, lastPListKey.incrementAndGet()) + } + + def destroyPList(key:Long) = writeExecutor.sync { + client.removePlist(key) + } + + def plistPut(key:Array[Byte], value:Array[Byte]) = client.plistPut(key, value) + def plistGet(key:Array[Byte]) = client.plistGet(key) + def plistDelete(key:Array[Byte]) = client.plistDelete(key) + def plistIterator = client.plistIterator def getMessage(x: MessageId):Message = { val id = Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x) Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1418686&r1=1418685&r2=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original) +++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Sat Dec 8 15:21:14 2012 @@ -27,16 +27,18 @@ import org.iq80.leveldb._ import org.fusesource.hawtdispatch._ import record.{CollectionKey, EntryKey, EntryRecord, CollectionRecord} -import util._ +import org.apache.activemq.leveldb.util._ import java.util.concurrent._ import org.fusesource.hawtbuf._ import java.io.{ObjectInputStream, ObjectOutputStream, File} import scala.Option._ -import org.apache.activemq.command.{MessageAck, DataStructure, Message} +import org.apache.activemq.command.{MessageAck, Message} import org.apache.activemq.util.ByteSequence -import org.apache.activemq.leveldb.RecordLog.LogInfo import java.text.SimpleDateFormat import java.util.{Date, Collections} +import org.apache.activemq.leveldb.util.TimeMetric +import org.apache.activemq.leveldb.RecordLog.LogInfo +import scala.Some /** * @author Hiram Chirino @@ -58,6 +60,8 @@ object LevelDBClient extends Log { override def shutdownNow = Collections.emptyList[Runnable] } + val PLIST_WRITE_OPTIONS = new WriteOptions().sync(false) + final val DIRTY_INDEX_KEY = bytes(":dirty") final val LOG_REF_INDEX_KEY = bytes(":log-refs") final val COLLECTION_META_KEY = bytes(":collection-meta") @@ -112,6 +116,18 @@ object LevelDBClient extends Log { (in.readVarLong(), in.readVarInt()) } + def encodeLongLong(a1:Long, a2:Long) = { + val out = new DataByteArrayOutputStream(8) + out.writeLong(a1) + out.writeLong(a2) + out.toBuffer + } + + def decodeLongLong(bytes:Array[Byte]):(Long, Long) = { + val in = new DataByteArrayInputStream(bytes) + (in.readLong(), in.readLong()) + } + def encodeLong(a1:Long) = { val out = new DataByteArrayOutputStream(8) out.writeLong(a1) @@ -404,6 +420,7 @@ class LevelDBClient(store: LevelDBStore) var log:RecordLog = _ var index:RichDB = _ + var plist:RichDB = _ var indexOptions:Options = _ var lastIndexSnapshotPos:Long = _ @@ -414,6 +431,7 @@ class LevelDBClient(store: LevelDBStore) val collectionMeta = HashMap[Long, CollectionMeta]() + def plistIndexFile = directory / ("plist"+INDEX_SUFFIX) def dirtyIndexFile = directory / ("dirty"+INDEX_SUFFIX) def tempIndexFile = directory / ("temp"+INDEX_SUFFIX) def snapshotIndexFile(id:Long) = create_sequence_file(directory,id, INDEX_SUFFIX) @@ -526,6 +544,11 @@ class LevelDBClient(store: LevelDBStore) retry { + // Setup the plist index. + plistIndexFile.recursiveDelete + plistIndexFile.mkdirs() + plist = new RichDB(factory.open(plistIndexFile, indexOptions)); + // Delete the dirty indexes dirtyIndexFile.recursiveDelete dirtyIndexFile.mkdirs() @@ -1275,4 +1298,23 @@ class LevelDBClient(store: LevelDBStore) } } + def removePlist(collectionKey: Long) = { + val entryKeyPrefix = encodeLong(collectionKey) + collectionMeta.remove(collectionKey) + retry { + val ro = new ReadOptions + ro.fillCache(false) + ro.verifyChecksums(false) + plist.cursorPrefixed(entryKeyPrefix, ro) { (key, value)=> + plist.delete(key) + true + } + } + } + + def plistPut(key:Array[Byte], value:Array[Byte]) = plist.put(key, value, PLIST_WRITE_OPTIONS) + def plistDelete(key:Array[Byte]) = plist.delete(key, PLIST_WRITE_OPTIONS) + def plistGet(key:Array[Byte]) = plist.get(key) + def plistIterator = plist.db.iterator() + } Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1418686&r1=1418685&r2=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original) +++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Sat Dec 8 15:21:14 2012 @@ -23,8 +23,8 @@ import org.apache.activemq.openwire.Open import org.apache.activemq.usage.SystemUsage import java.io.File import java.io.IOException -import java.util.concurrent.{CountDownLatch, ExecutionException, Future} -import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.{ExecutionException, Future} +import java.util.concurrent.atomic.{AtomicLong, AtomicInteger} import reflect.BeanProperty import org.apache.activemq.store._ import java.util._ @@ -34,6 +34,9 @@ import javax.management.ObjectName import org.apache.activemq.broker.jmx.AnnotatedMBean import org.apache.activemq.util._ import org.apache.activemq.leveldb.util.{RetrySupport, FileSupport, Log} +import org.apache.activemq.store.PList.PListIterator +import java.lang +import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream, Buffer} object LevelDBStore extends Log { val DEFAULT_DIRECTORY = new File("LevelDB"); @@ -111,7 +114,7 @@ class LevelDBStoreView(val store:LevelDB import LevelDBStore._ -class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore { +class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore with PListStore { final val wireFormat = new OpenWireFormat final val db = new DBManager(this) @@ -157,6 +160,7 @@ class LevelDBStore extends LockableServi val queues = collection.mutable.HashMap[ActiveMQQueue, LevelDBStore#LevelDBMessageStore]() val topics = collection.mutable.HashMap[ActiveMQTopic, LevelDBStore#LevelDBTopicMessageStore]() val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]() + val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]() def init() = {} @@ -299,7 +303,9 @@ class LevelDBStore extends LockableServi if( prepared ) { uow.dequeue(xacontainer_id, message.getMessageId) } - message.setMessageId(message.getMessageId.copy()) + var copy = message.getMessageId.copy() + copy.setEntryLocator(null) + message.setMessageId(copy) store.doAdd(uow, message, delay) } @@ -370,7 +376,6 @@ class LevelDBStore extends LockableServi preCommit.run() transactions.remove(txid) match { case None=> - println("The transaction does not exist") postCommit.run() case Some(tx)=> val done = new CountDownLatch(1) @@ -433,6 +438,31 @@ class LevelDBStore extends LockableServi } } + + def getPList(name: String): PList = { + this.synchronized(plists.get(name)).getOrElse(db.createPList(name)) + } + + def createPList(name: String, key: Long):LevelDBStore#LevelDBPList = { + var rc = new LevelDBPList(name, key) + this.synchronized { + plists.put(name, rc) + } + rc + } + + def removePList(name: String): Boolean = { + plists.remove(name) match { + case Some(list)=> + db.destroyPList(list.key) + list.listSize.set(0) + true + case None => + false + } + } + + def createMessageStore(destination: ActiveMQDestination):LevelDBStore#LevelDBMessageStore = { destination match { case destination:ActiveMQQueue => @@ -531,6 +561,7 @@ class LevelDBStore extends LockableServi override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false) override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = { + message.getMessageId.setEntryLocator(null) if( message.getTransactionId!=null ) { transaction(message.getTransactionId).add(this, message, delay) DONE @@ -765,6 +796,101 @@ class LevelDBStore extends LockableServi } } + class LevelDBPList(val name: String, val key: Long) extends PList { + import LevelDBClient._ + + val lastSeq = new AtomicLong(Long.MaxValue/2) + val firstSeq = new AtomicLong(lastSeq.get+1) + val listSize = new AtomicLong(0) + + def getName: String = name + def destroy() = { + removePList(name) + } + + def addFirst(id: String, bs: ByteSequence): AnyRef = { + var pos = lastSeq.decrementAndGet() + add(pos, id, bs) + listSize.incrementAndGet() + new lang.Long(pos) + } + + def addLast(id: String, bs: ByteSequence): AnyRef = { + var pos = lastSeq.incrementAndGet() + add(pos, id, bs) + listSize.incrementAndGet() + new lang.Long(pos) + } + + def add(pos:Long, id: String, bs: ByteSequence) = { + val encoded_key = encodeLongLong(key, pos) + val encoded_id = new UTF8Buffer(id) + val os = new DataByteArrayOutputStream(2+encoded_id.length+bs.length) + os.writeShort(encoded_id.length) + os.write(encoded_id.data, encoded_id.offset, encoded_id.length) + os.write(bs.getData, bs.getOffset, bs.getLength) + db.plistPut(encoded_key, os.toBuffer.toByteArray) + } + + def remove(position: AnyRef): Boolean = { + val pos = position.asInstanceOf[lang.Long].longValue() + val encoded_key = encodeLongLong(key, pos) + db.plistGet(encoded_key) match { + case Some(value) => + db.plistDelete(encoded_key) + listSize.decrementAndGet() + true + case None => + false + } + } + + def isEmpty = size()==0 + def size(): Long = listSize.get() + + def iterator() = new PListIterator() { + val prefix = LevelDBClient.encodeLong(key) + var dbi = db.plistIterator + var last_key:Array[Byte] = _ + + dbi.seek(prefix); + + + def hasNext: Boolean = dbi!=null && dbi.hasNext && dbi.peekNext.getKey.startsWith(prefix) + def next() = { + if ( dbi==null || !dbi.hasNext ) { + throw new NoSuchElementException(); + } + val n = dbi.peekNext(); + last_key = n.getKey + val (k, pos) = decodeLongLong(last_key) + if( k!=key ) { + throw new NoSuchElementException(); + } + var value = n.getValue + val is = new org.fusesource.hawtbuf.DataByteArrayInputStream(value) + val id = is.readBuffer(is.readShort()).utf8().toString + val data = new ByteSequence(value, is.getPos, value.length-is.getPos) + dbi.next() + new PListEntry(id, data, pos) + } + + def release() = { + dbi.close() + dbi = null + } + + def remove() = { + if( last_key==null ) { + throw new NoSuchElementException(); + } + db.plistDelete(last_key) + listSize.decrementAndGet() + last_key = null + } + } + + } /////////////////////////////////////////////////////////////////////////// // The following methods actually have nothing to do with JMS txs... It's more like Copied: activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/LevelDBPlistTest.java (from r1418454, activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PList.java) URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/LevelDBPlistTest.java?p2=activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/LevelDBPlistTest.java&p1=activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PList.java&r1=1418454&r2=1418686&rev=1418686&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/store/PList.java (original) +++ activemq/trunk/activemq-leveldb-store/src/test/scala/org/apache/activemq/leveldb/LevelDBPlistTest.java Sat Dec 8 15:21:14 2012 @@ -14,44 +14,37 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.store; +package org.apache.activemq.leveldb; -import org.apache.activemq.util.ByteSequence; - -import java.io.IOException; -import java.util.Iterator; +import org.apache.activemq.store.PListTestSupport; /** * @author Hiram Chirino */ -public interface PList { - void setName(String name); - - String getName(); - - void destroy() throws IOException; - - void addLast(String id, ByteSequence bs) throws IOException; - - void addFirst(String id, ByteSequence bs) throws IOException; - - boolean remove(String id) throws IOException; - - boolean remove(long position) throws IOException; +public class LevelDBPlistTest extends PListTestSupport { - PListEntry get(long position) throws IOException; - - PListEntry getFirst() throws IOException; - - PListEntry getLast() throws IOException; + @Override + protected LevelDBStore createPListStore() { + return new LevelDBStore(); + } - boolean isEmpty(); + protected LevelDBStore createConcurrentAddIteratePListStore() { + return new LevelDBStore(); + } - PListIterator iterator() throws IOException; + @Override + protected LevelDBStore createConcurrentAddRemovePListStore() { + return new LevelDBStore(); + } - long size(); + @Override + protected LevelDBStore createConcurrentAddRemoveWithPreloadPListStore() { + return new LevelDBStore(); + } - public interface PListIterator extends Iterator { - void release(); + @Override + protected LevelDBStore createConcurrentAddIterateRemovePListStore(boolean enablePageCache) { + return new LevelDBStore(); } + }