activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-4705 - close window where both master and slave can be active - slave waits for possible keepAlivePeriod with the lock before startin. Ensures a keepAlive has completed on the master
Date Fri, 22 May 2015 10:41:09 GMT
Repository: activemq
Updated Branches:
  refs/heads/master cc36633ce -> 9bc602be4


https://issues.apache.org/jira/browse/AMQ-4705 - close window where both master and slave
can be active - slave waits for possible keepAlivePeriod with the lock before startin. Ensures
a keepAlive has completed on the master


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9bc602be
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9bc602be
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9bc602be

Branch: refs/heads/master
Commit: 9bc602be43e9ce5f8747a0ac9eafe09ee40b0ac6
Parents: cc36633
Author: gtully <gary.tully@gmail.com>
Authored: Fri May 22 11:40:25 2015 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Fri May 22 11:40:40 2015 +0100

----------------------------------------------------------------------
 .../apache/activemq/store/SharedFileLocker.java |  10 +-
 .../activemq/store/SharedFileLockerTest.java    | 287 ++++++++++++-------
 2 files changed, 195 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/9bc602be/activemq-broker/src/main/java/org/apache/activemq/store/SharedFileLocker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/SharedFileLocker.java
b/activemq-broker/src/main/java/org/apache/activemq/store/SharedFileLocker.java
index 36ebe62..56e7bde 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/SharedFileLocker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/SharedFileLocker.java
@@ -18,6 +18,7 @@ package org.apache.activemq.store;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.broker.AbstractLocker;
 import org.apache.activemq.util.LockFile;
@@ -54,6 +55,13 @@ public class SharedFileLocker extends AbstractLocker {
                 while ((!isStopped()) && (!isStopping())) {
                     try {
                         lockFile.lock();
+                        if (warned) {
+                            // ensure lockHolder has released; wait for one keepAlive iteration
+                            try {
+                                TimeUnit.MILLISECONDS.sleep(lockable != null ? lockable.getLockKeepAlivePeriod()
: 0l);
+                            } catch (InterruptedException e1) {
+                            }
+                        }
                         locked = keepAlive();
                         break;
                     } catch (IOException e) {
@@ -72,7 +80,7 @@ public class SharedFileLocker extends AbstractLocker {
                                     + " seconds for the database to be unlocked. Reason:
"
                                     + e);
                         try {
-                            Thread.sleep(lockAcquireSleepInterval);
+                            TimeUnit.MILLISECONDS.sleep(lockAcquireSleepInterval);
                         } catch (InterruptedException e1) {
                         }
                     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/9bc602be/activemq-unit-tests/src/test/java/org/apache/activemq/store/SharedFileLockerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/SharedFileLockerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/SharedFileLockerTest.java
index ba768c0..c5234fb 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/SharedFileLockerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/SharedFileLockerTest.java
@@ -18,155 +18,240 @@
 package org.apache.activemq.store;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.broker.LockableServiceSupport;
+import org.apache.activemq.broker.Locker;
 import org.apache.activemq.util.DefaultTestAppender;
 import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.LockFile;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.Wait;
 import org.apache.log4j.Logger;
 import org.apache.log4j.spi.LoggingEvent;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.LoggerFactory;
+
+
+import static junit.framework.Assert.assertTrue;
 
-public class SharedFileLockerTest
-{
-   @Rule
-   public TemporaryFolder testFolder;
+public class SharedFileLockerTest {
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(SharedFileLockerTest.class);
 
+    @Rule
+    public TemporaryFolder testFolder;
 
-   public SharedFileLockerTest()
-   {
-      File file = new File(IOHelper.getDefaultDataDirectory());
-      file.mkdir();
 
-      // TemporaryFolder will make sure the files are removed after the test is done
-      testFolder = new TemporaryFolder(file);
+    public SharedFileLockerTest() {
+        File file = new File(IOHelper.getDefaultDataDirectory());
+        file.mkdir();
 
-   }
+        // TemporaryFolder will make sure the files are removed after the test is done
+        testFolder = new TemporaryFolder(file);
 
-   @Test
-   public void testLoop() throws Exception
-   {
-      // Increase the number of iterations if you are debugging races
-      for (int i = 0 ; i < 100; i++)
-      {
-         internalLoop(5);
-      }
+    }
 
-   }
+    @Test
+    public void testLoop() throws Exception {
+        // Increase the number of iterations if you are debugging races
+        for (int i = 0; i < 100; i++) {
+            internalLoop(5);
+        }
 
+    }
 
-   @Test
-   public void testLogging() throws Exception
-   {
-      // using a bigger wait here
-      // to make sure we won't log any extra info
-      internalLoop(100);
-   }
 
-   private void internalLoop(long timewait) throws Exception
-   {
-      final AtomicInteger logCounts = new AtomicInteger(0);
-      DefaultTestAppender appender = new DefaultTestAppender() {
-         @Override
-         public void doAppend(LoggingEvent event) {
-            logCounts.incrementAndGet();
-         }
-      };
+    @Test
+    public void testLogging() throws Exception {
+        // using a bigger wait here
+        // to make sure we won't log any extra info
+        internalLoop(100);
+    }
+
+    private void internalLoop(long timewait) throws Exception {
+        final AtomicInteger logCounts = new AtomicInteger(0);
+        DefaultTestAppender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                logCounts.incrementAndGet();
+            }
+        };
+
+        Logger.getRootLogger().addAppender(appender);
 
-      Logger.getRootLogger().addAppender(appender);
+        final AtomicInteger errors = new AtomicInteger(0);
 
-      final AtomicInteger errors = new AtomicInteger(0);
+        Thread thread = null;
 
-      Thread thread = null;
+        SharedFileLocker locker1 = new SharedFileLocker();
+        locker1.setDirectory(testFolder.getRoot());
 
-      SharedFileLocker locker1 = new SharedFileLocker();
-      locker1.setDirectory(testFolder.getRoot());
+        final SharedFileLocker locker2 = new SharedFileLocker();
+        locker2.setLockAcquireSleepInterval(1);
+        locker2.setDirectory(testFolder.getRoot());
 
-      final SharedFileLocker locker2 = new SharedFileLocker();
-      locker2.setLockAcquireSleepInterval(1);
-      locker2.setDirectory(testFolder.getRoot());
 
+        try {
+            locker1.doStart();
 
-      try
-      {
-         locker1.doStart();
+            Assert.assertTrue(locker1.keepAlive());
 
-         Assert.assertTrue(locker1.keepAlive());
+            thread = new Thread("Locker Thread") {
+                public void run() {
+                    try {
+                        locker2.doStart();
+                    } catch (Throwable e) {
+                        errors.incrementAndGet();
+                    }
+                }
+            };
 
-         thread = new Thread("Locker Thread")
-         {
-            public void run()
+            thread.start();
+
+            // I need to make sure the info was already logged
+            // but I don't want to have an unecessary wait here,
+            // as I want the test to run as fast as possible
             {
-               try
-               {
-                  locker2.doStart();
-               }
-               catch (Throwable e)
-               {
-                  errors.incrementAndGet();
-               }
+                long timeout = System.currentTimeMillis() + 5000;
+                while (logCounts.get() < 1 && System.currentTimeMillis() <
timeout) {
+                    Thread.sleep(1);
+                }
+            }
+
+            if (timewait > 0) {
+                Thread.sleep(timewait);
             }
-         };
 
-         thread.start();
+            Assert.assertTrue(thread.isAlive());
+
+            locker1.stop();
+
+            // 10 seconds here is an eternity, but it should only take milliseconds
+            thread.join(5000);
+
+            Assert.assertEquals("Extra logs in place", 1, logCounts.get());
 
-         // I need to make sure the info was already logged
-         // but I don't want to have an unecessary wait here,
-         // as I want the test to run as fast as possible
-         {
             long timeout = System.currentTimeMillis() + 5000;
-            while (logCounts.get() < 1 && System.currentTimeMillis() < timeout)
-            {
-               Thread.sleep(1);
+
+            while (timeout > System.currentTimeMillis() && !locker2.keepAlive())
{
+                Thread.sleep(1);
             }
-         }
 
-         if (timewait > 0)
-         {
-            Thread.sleep(timewait);
-         }
+            Assert.assertTrue(locker2.keepAlive());
 
-         Assert.assertTrue(thread.isAlive());
+            locker2.stop();
 
-         locker1.stop();
+            Assert.assertEquals(0, errors.get());
 
-         // 10 seconds here is an eternity, but it should only take milliseconds
-         thread.join(5000);
+        } finally {
 
-         Assert.assertEquals("Extra logs in place", 1, logCounts.get());
 
-         long timeout = System.currentTimeMillis() + 5000;
+            Logger.getRootLogger().removeAppender(appender);
 
-         while (timeout > System.currentTimeMillis() && !locker2.keepAlive())
-         {
-            Thread.sleep(1);
-         }
+            // to make sure we won't leak threads if the test ever failed for any reason
+            thread.join(1000);
+            if (thread.isAlive()) {
+                thread.interrupt();
+            }
 
-         Assert.assertTrue(locker2.keepAlive());
+            File lockFile = new File(testFolder.getRoot(), "lock");
+            lockFile.delete();
+        }
 
-         locker2.stop();
+    }
 
-         Assert.assertEquals(0, errors.get());
+    @Test
+    public void verifyLockAcquireWaitsForLockDrop() throws Exception {
 
-      }
-      finally
-      {
+        final AtomicInteger logCounts = new AtomicInteger(0);
+        DefaultTestAppender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                logCounts.incrementAndGet();
+            }
+        };
+        Logger sharedFileLogger = Logger.getLogger(SharedFileLocker.class);
+        sharedFileLogger.addAppender(appender);
 
+        LockableServiceSupport config = new LockableServiceSupport() {
 
-         Logger.getRootLogger().removeAppender(appender);
+            @Override
+            public long getLockKeepAlivePeriod() {
+                return 500;
+            }
 
-         // to make sure we won't leak threads if the test ever failed for any reason
-         thread.join(1000);
-         if (thread.isAlive())
-         {
-            thread.interrupt();
-         }
+            @Override
+            public Locker createDefaultLocker() throws IOException {
+                return null;
+            }
 
-         File lockFile = new File(testFolder.getRoot(), "lock");
-         lockFile.delete();
-      }
+            public void init() throws Exception {
+            }
 
-   }
+            protected void doStop(ServiceStopper stopper) throws Exception {
+            }
+
+            protected void doStart() throws Exception {
+            }
+        };
+
+        final SharedFileLocker underTest = new SharedFileLocker();
+        underTest.setDirectory(testFolder.getRoot());
+        underTest.setLockAcquireSleepInterval(5);
+        underTest.setLockable(config);
+
+        // get the in jvm lock
+        File lockFile = new File(testFolder.getRoot(), "lock");
+        String jvmProp = LockFile.class.getName() + ".lock." + lockFile.getCanonicalPath();
+        System.getProperties().put(jvmProp, jvmProp);
+
+        final CountDownLatch locked = new CountDownLatch(1);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        try {
+            final AtomicLong acquireTime = new AtomicLong(0l);
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        underTest.start();
+                        acquireTime.set(System.currentTimeMillis());
+                        locked.countDown();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+
+            assertTrue("locker failed to obtain lock", Wait.waitFor(new Wait.Condition()
{
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return logCounts.get() > 0;
+                }
+            }, 5000, 10));
+
+            // release vm lock
+            long releaseTime = System.currentTimeMillis();
+            System.getProperties().remove(jvmProp);
+
+            assertTrue("locker got lock", locked.await(5, TimeUnit.SECONDS));
+
+            // verify delay in start
+            LOG.info("ReleaseTime: " + releaseTime + ", AcquireTime:" + acquireTime.get());
+            assertTrue("acquire delayed for keepAlive: " + config.getLockKeepAlivePeriod(),
acquireTime.get() >= releaseTime + config.getLockKeepAlivePeriod());
+
+        } finally {
+            executorService.shutdownNow();
+            underTest.stop();
+            lockFile.delete();
+            sharedFileLogger.removeAppender(appender);
+        }
+    }
 }


Mime
View raw message