activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6152
Date Wed, 26 Apr 2017 17:30:48 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.11.x 695db6464 -> de681b937


https://issues.apache.org/jira/browse/AMQ-6152

Ensure that when add / remove commands are colocated they don't prevent
the log from being GC'd once it is unreferenced.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/de681b93
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/de681b93
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/de681b93

Branch: refs/heads/activemq-5.11.x
Commit: de681b93757010d86f566ed148bfa5e60bd843e4
Parents: 695db64
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue Feb 2 20:28:24 2016 -0500
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Wed Apr 26 18:51:25 2017 +0200

----------------------------------------------------------------------
 .../kahadb/scheduler/JobSchedulerImpl.java      | 16 ++++++--
 .../kahadb/scheduler/JobSchedulerStoreImpl.java |  8 ++--
 .../JobSchedulerStoreCheckpointTest.java        | 42 +++++++++++++++++++-
 3 files changed, 57 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/de681b93/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
index bcb819c..82b9ff5 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
@@ -505,8 +505,12 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable,
JobSch
 
             // now that the job is removed from the index we can store the remove info and
             // then dereference the log files that hold the initial add command and the most
-            // recent update command.
-            this.store.referenceRemovedLocation(tx, location, removed);
+            // recent update command.  If the remove and the add that created the job are
in
+            // the same file we don't need to track it and just let a normal GC of the logs
+            // remove it when the log is unreferenced.
+            if (removed.getLocation().getDataFileId() != location.getDataFileId()) {
+                this.store.referenceRemovedLocation(tx, location, removed);
+            }
         }
     }
 
@@ -589,8 +593,12 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable,
JobSch
 
                     // now that the job is removed from the index we can store the remove
info and
                     // then dereference the log files that hold the initial add command and
the most
-                    // recent update command.
-                    this.store.referenceRemovedLocation(tx, location, job);
+                    // recent update command.  If the remove and the add that created the
job are in
+                    // the same file we don't need to track it and just let a normal GC of
the logs
+                    // remove it when the log is unreferenced.
+                    if (job.getLocation().getDataFileId() != location.getDataFileId()) {
+                        this.store.referenceRemovedLocation(tx, location, job);
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/de681b93/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
index 1a08931..f73b6a3 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
@@ -397,22 +397,22 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements
JobSch
                 Iterator<Entry<Integer, List<Integer>>> removals = metaData.getRemoveLocationTracker().iterator(tx);
                 List<Integer> orphans = new ArrayList<Integer>();
                 while (removals.hasNext()) {
-                    boolean orphanedRemve = true;
+                    boolean orphanedRemove = true;
                     Entry<Integer, List<Integer>> entry = removals.next();
 
                     // If this log is not a GC candidate then there's no need to do a check
to rule it out
                     if (gcCandidateSet.contains(entry.getKey())) {
                         for (Integer addLocation : entry.getValue()) {
                             if (completeFileSet.contains(addLocation)) {
-                                orphanedRemve = false;
+                                LOG.trace("A remove in log {} has an add still in existance
in {}.", entry.getKey(), addLocation);
+                                orphanedRemove = false;
                                 break;
                             }
                         }
 
                         // If it's not orphaned than we can't remove it, otherwise we
                         // stop tracking it it's log will get deleted on the next check.
-                        if (!orphanedRemve) {
-                            LOG.trace("A remove in log {} has an add still in existance.",
entry.getKey());
+                        if (!orphanedRemove) {
                             gcCandidateSet.remove(entry.getKey());
                         } else {
                             LOG.trace("All removes in log {} are orphaned, file can be GC'd",
entry.getKey());

http://git-wip-us.apache.org/repos/asf/activemq/blob/de681b93/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
index c013a4c..3685f34 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java
@@ -29,6 +29,7 @@ import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.Wait;
+import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -45,6 +46,10 @@ public class JobSchedulerStoreCheckpointTest {
 
     @Before
     public void setUp() throws Exception {
+
+        // investigate gc issue - store usage not getting released
+        org.apache.log4j.Logger.getLogger(JobSchedulerStoreImpl.class).setLevel(Level.TRACE);
+
         File directory = new File("target/test/ScheduledJobsDB");
         IOHelper.mkdirs(directory);
         IOHelper.deleteChildren(directory);
@@ -80,7 +85,7 @@ public class JobSchedulerStoreCheckpointTest {
     }
 
     @Test
-    public void test() throws Exception {
+    public void testStoreCleanupLinear() throws Exception {
         final int COUNT = 10;
         final CountDownLatch latch = new CountDownLatch(COUNT);
         scheduler.addListener(new JobListener() {
@@ -122,4 +127,39 @@ public class JobSchedulerStoreCheckpointTest {
 
         LOG.info("Number of journal log files: {}", getNumJournalFiles());
     }
+
+    @Test
+    public void testColocatedAddRemoveCleanup() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        scheduler.addListener(new JobListener() {
+            @Override
+            public void scheduledJob(String id, ByteSequence job) {
+                latch.countDown();
+            }
+        });
+
+        byte[] data = new byte[1024];
+        for (int i = 0; i < data.length; ++i) {
+            data[i] = (byte) (i % 256);
+        }
+
+        long time = TimeUnit.SECONDS.toMillis(2);
+        scheduler.schedule("Message-1", new ByteSequence(data), "", time, 0, 0);
+
+        assertTrue(latch.await(70, TimeUnit.SECONDS));
+        assertEquals(0, latch.getCount());
+
+        scheduler.schedule("Message-2", payload, "", time, 0, 0);
+        scheduler.schedule("Message-3", payload, "", time, 0, 0);
+
+        assertTrue("Should be only one log left: " + getNumJournalFiles(), Wait.waitFor(new
Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getNumJournalFiles() == 1;
+            }
+        }, TimeUnit.MINUTES.toMillis(2)));
+
+        LOG.info("Number of journal log files: {}", getNumJournalFiles());
+    }
 }


Mime
View raw message