activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1104075 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/ activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/ activemq-xmpp/
Date Tue, 17 May 2011 09:16:23 GMT
Author: gtully
Date: Tue May 17 09:16:22 2011
New Revision: 1104075

URL: http://svn.apache.org/viewvc?rev=1104075&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3325 - PList temp store, chunk stream does not exist
when broker under stress. Sync issue around temp store list creation for subs pending message
cursor

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
    activemq/trunk/activemq-xmpp/pom.xml

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java?rev=1104075&r1=1104074&r2=1104075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
Tue May 17 09:16:22 2011
@@ -19,6 +19,7 @@ package org.apache.activemq.store.kahadb
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.activemq.store.kahadb.plist.EntryLocation.EntryLocationMarshaller;
@@ -26,8 +27,11 @@ import org.apache.kahadb.journal.Locatio
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.Transaction;
 import org.apache.kahadb.util.ByteSequence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PList {
+    static final Logger LOG = LoggerFactory.getLogger(PList.class);
     final PListStore store;
     private String name;
     private long rootId = EntryLocation.NOT_SET;
@@ -334,6 +338,25 @@ public class PList {
         return result;
     }
 
+    synchronized public void claimFileLocations(final Set<Integer> candidates) throws
IOException {
+        synchronized (indexLock) {
+            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>()
{
+                public void execute(Transaction tx) throws IOException {
+                    long nextId = rootId;
+                    while (nextId != EntryLocation.NOT_SET) {
+                        EntryLocation entry = getNext(tx, nextId);
+                        if (entry != null) {
+                            candidates.remove(entry.getLocation().getDataFileId());
+                            nextId = entry.getNext();
+                        } else {
+                            break;
+                        }
+                    }
+                }
+            });
+        }
+    }
+
     boolean remove(Transaction tx, String id) throws IOException {
         boolean result = false;
         long nextId = this.rootId;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java?rev=1104075&r1=1104074&r2=1104075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
Tue May 17 09:16:22 2011
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.thread.Scheduler;
@@ -195,46 +194,54 @@ public class PListStore extends ServiceS
         }
     }
 
-    synchronized public PList getPList(final String name) throws Exception {
+    public PList getPList(final String name) throws Exception {
         if (!isStarted()) {
             throw new IllegalStateException("Not started");
         }
         intialize();
-        PList result = this.persistentLists.get(name);
-        if (result == null) {
-            final PList pl = new PList(this);
-            pl.setName(name);
-            getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-                public void execute(Transaction tx) throws IOException {
-                    pl.setRootId(tx.allocate().getPageId());
-                    pl.load(tx);
-                    metaData.storedSchedulers.put(tx, name, pl);
+        synchronized (indexLock) {
+            synchronized (this) {
+                PList result = this.persistentLists.get(name);
+                if (result == null) {
+                    final PList pl = new PList(this);
+                    pl.setName(name);
+                    getPageFile().tx().execute(new Transaction.Closure<IOException>()
{
+                        public void execute(Transaction tx) throws IOException {
+                            pl.setRootId(tx.allocate().getPageId());
+                            pl.load(tx);
+                            metaData.storedSchedulers.put(tx, name, pl);
+                        }
+                    });
+                    result = pl;
+                    this.persistentLists.put(name, pl);
                 }
-            });
-            result = pl;
-            this.persistentLists.put(name, pl);
-        }
-        final PList load = result;
-        getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-            public void execute(Transaction tx) throws IOException {
-                load.load(tx);
-            }
-        });
+                final PList load = result;
+                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                    public void execute(Transaction tx) throws IOException {
+                        load.load(tx);
+                    }
+                });
 
-        return result;
+                return result;
+            }
+        }
     }
 
-    synchronized public boolean removePList(final String name) throws Exception {
+    public boolean removePList(final String name) throws Exception {
         boolean result = false;
-        final PList pl = this.persistentLists.remove(name);
-        result = pl != null;
-        if (result) {
-            getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-                public void execute(Transaction tx) throws IOException {
-                    metaData.storedSchedulers.remove(tx, name);
-                    pl.destroy(tx);
+        synchronized (indexLock) {
+            synchronized (this) {
+                final PList pl = this.persistentLists.remove(name);
+                result = pl != null;
+                if (result) {
+                    getPageFile().tx().execute(new Transaction.Closure<IOException>()
{
+                        public void execute(Transaction tx) throws IOException {
+                            metaData.storedSchedulers.remove(tx, name);
+                            pl.destroy(tx);
+                        }
+                    });
                 }
-            });
+            }
         }
         return result;
     }
@@ -324,16 +331,21 @@ public class PListStore extends ServiceS
         try {
             final Set<Integer> candidates = journal.getFileMap().keySet();
             LOG.trace("Full gc candidate set:" + candidates);
-            for (PList list : persistentLists.values()) {
-                PListEntry entry = list.getFirst();
-                while (entry != null) {
-                    claimCandidates(entry, candidates);
-                    entry = list.getNext(entry);
+            if (candidates.size() > 1) {
+                List<PList> plists = null;
+                synchronized (this) {
+                    plists = new ArrayList(persistentLists.values());
+                }
+                for (PList list : plists) {
+                    list.claimFileLocations(candidates);
+                    if (isStopping()) {
+                        return;
+                    }
+                    LOG.trace("Remaining gc candidate set after refs from: " + list.getName()
+ ":" + candidates);
                 }
-                LOG.trace("Remaining gc candidate set after refs from: " + list.getName()
+ ":" + candidates);
+                LOG.trace("GC Candidate set:" + candidates);
+                this.journal.removeDataFiles(candidates);
             }
-            LOG.debug("GC Candidate set:" + candidates);
-            this.journal.removeDataFiles(candidates);
         } catch (IOException e) {
             LOG.error("Exception on periodic cleanup: " + e, e);
         }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java?rev=1104075&r1=1104074&r2=1104075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
Tue May 17 09:16:22 2011
@@ -26,7 +26,6 @@ import java.io.IOException;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Vector;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -36,11 +35,16 @@ import org.apache.kahadb.util.ByteSequen
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PListTest {
-
+    static final Logger LOG = LoggerFactory.getLogger(PListTest.class);
     private PListStore store;
     private PList plist;
+    final ByteSequence payload = new ByteSequence(new byte[400]);
+    final String idSeed = new String("Seed");
+    final Vector<Throwable> exceptions = new Vector<Throwable>();
    
 
     @Test
@@ -225,6 +229,175 @@ public class PListTest {
         assertTrue("no exceptions", exceptions.isEmpty());
     }
 
+
+    @Test
+    public void testConcurrentAddLast() throws Exception {
+        File directory = store.getDirectory();
+        store.stop();
+        IOHelper.mkdirs(directory);
+        IOHelper.deleteChildren(directory);
+        store = new PListStore();
+        store.setDirectory(directory);
+        //store.setJournalMaxFileLength(1024*5);
+        store.start();
+
+
+        final int numThreads = 20;
+        final int iterations = 2000;
+        ExecutorService executor = Executors.newFixedThreadPool(100);
+        for (int i=0; i<numThreads; i++) {
+            new Job(i, PListTest.TaskType.ADD, iterations).run();
+        }
+
+        for (int i=0; i<numThreads; i++) {
+            executor.execute(new Job(i, PListTest.TaskType.ITERATE, iterations));
+        }
+
+        for (int i=0; i<100; i++) {
+            executor.execute(new Job(i+20, PListTest.TaskType.ADD, 100));
+        }
+
+        executor.shutdown();
+        executor.awaitTermination(60*5, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testOverFlow() throws Exception {
+        File directory = store.getDirectory();
+        store.stop();
+        IOHelper.mkdirs(directory);
+        IOHelper.deleteChildren(directory);
+        store = new PListStore();
+        store.setDirectory(directory);
+        store.start();
+
+        for (int i=0;i<2000; i++) {
+            new Job(i, PListTest.TaskType.ADD, 5).run();
+
+        }
+        LOG.info("After Load index file: " + store.pageFile.getFile().length());
+        LOG.info("After remove index file: " + store.pageFile.getFile().length());
+    }
+
+
+    @Test
+    public void testConcurrentAddRemoveWithPreload() throws Exception {
+        File directory = store.getDirectory();
+        store.stop();
+        IOHelper.mkdirs(directory);
+        IOHelper.deleteChildren(directory);
+        store = new PListStore();
+        store.setDirectory(directory);
+        store.setJournalMaxFileLength(1024*5);
+        store.setCleanupInterval(5000);
+        store.start();
+
+        final int iterations = 500;
+        final int numLists = 10;
+
+        // prime the store
+
+        // create/delete
+        for (int i=0; i<numLists;i++) {
+            new Job(i, PListTest.TaskType.CREATE, iterations).run();
+        }
+
+        for (int i=0; i<numLists;i++) {
+            new Job(i, PListTest.TaskType.DELETE, iterations).run();
+        }
+
+        // fill
+        for (int i=0; i<numLists;i++) {
+            new Job(i, PListTest.TaskType.ADD, iterations).run();
+        }
+        // empty
+        for (int i=0; i<numLists;i++) {
+            new Job(i, PListTest.TaskType.REMOVE, iterations).run();
+        }
+        // empty
+        for (int i=0; i<numLists;i++) {
+            new Job(i, PListTest.TaskType.DELETE, iterations).run();
+        }
+
+        // fill
+        for (int i=0; i<numLists;i++) {
+            new Job(i, PListTest.TaskType.ADD, iterations).run();
+        }
+
+        // parallel
+        ExecutorService executor = Executors.newFixedThreadPool(100);
+        for (int i=0; i<numLists*2; i++) {
+            executor.execute(new Job(i, i>=numLists ? PListTest.TaskType.ADD : PListTest.TaskType.REMOVE,
iterations));
+        }
+
+        executor.shutdown();
+        executor.awaitTermination(60*5, TimeUnit.SECONDS);
+        assertTrue("no excepitons", exceptions.isEmpty());
+    }
+
+    enum TaskType {CREATE, DELETE, ADD, REMOVE, ITERATE}
+
+    class Job implements Runnable {
+
+        int id;
+        TaskType task;
+        int iterations;
+
+        public Job(int id, TaskType t, int iterations) {
+            this.id = id;
+            this.task = t;
+            this.iterations = iterations;
+        }
+
+        @Override
+        public void run() {
+            try {
+                PList plist = null;
+                switch (task) {
+                    case CREATE:
+                        plist = store.getPList("List-" + id);
+                        break;
+                    case DELETE:
+                        store.removePList("List-" + id);
+                        break;
+                    case ADD:
+                        plist = store.getPList("List-" + id);
+
+                        for (int j = 0; j < iterations; j++) {
+                            plist.addLast(idSeed + "id" + j, payload);
+                            if (j > 0 && j % (iterations / 2) == 0) {
+                                LOG.info("Job-" + id + ", Done: " + j);
+                            }
+                        }
+                        break;
+                    case REMOVE:
+                        plist = store.getPList("List-" + id);
+
+                        for (int j = iterations; j > 0; j--) {
+                            plist.remove(idSeed + "id" + j);
+                            if (j > 0 && j % (iterations / 2) == 0) {
+                                LOG.info("Job-" + id + " Done remove: " + j);
+                            }
+                        }
+                        break;
+                    case ITERATE:
+                        plist = store.getPList("List-" + id);
+
+                        PListEntry element = plist.getFirst();
+                        while (element != null) {
+                            element = plist.getNext(element);
+                        }
+                        break;
+                    default:
+                }
+
+            } catch (Exception e) {
+                e.printStackTrace();
+                exceptions.add(e);
+            }
+        }
+    }
+
     @Before
     public void setUp() throws Exception {
         File directory = new File("target/test/PlistDB");
@@ -244,6 +417,7 @@ public class PListTest {
     @After
     public void tearDown() throws Exception {
         store.stop();
+        exceptions.clear();
     }
 
 }

Modified: activemq/trunk/activemq-xmpp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-xmpp/pom.xml?rev=1104075&r1=1104074&r2=1104075&view=diff
==============================================================================
--- activemq/trunk/activemq-xmpp/pom.xml (original)
+++ activemq/trunk/activemq-xmpp/pom.xml Tue May 17 09:16:22 2011
@@ -97,7 +97,6 @@
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
-      <version>3.8.1</version>
       <scope>test</scope>
     </dependency>
     <dependency>



Mime
View raw message