activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1063710 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Date Wed, 26 Jan 2011 13:30:42 GMT
Author: gtully
Date: Wed Jan 26 13:30:42 2011
New Revision: 1063710

URL: http://svn.apache.org/viewvc?rev=1063710&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3161 - Race condition in ActiveMQ Journal Checkpoint
worker thread cleanup leads to multiple running instances
patch applied with thanks, one small mod, left un synced check in store(..) such that locking
only occurs if thread needs a restart

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1063710&r1=1063709&r2=1063710&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Wed Jan 26 13:30:42 2011
@@ -213,6 +213,7 @@ public class MessageDatabase extends Ser
     private boolean checksumJournalFiles = false;
     private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY;
     protected boolean forceRecoverIndex = false;
+    private final Object checkpointThreadLock = new Object();
 
     public MessageDatabase() {
     }
@@ -273,38 +274,49 @@ public class MessageDatabase extends Ser
 	}
 	
 	private void startCheckpoint() {
-        checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
-            @Override
-            public void run() {
-                try {
-                    long lastCleanup = System.currentTimeMillis();
-                    long lastCheckpoint = System.currentTimeMillis();
-                    // Sleep for a short time so we can periodically check 
-                    // to see if we need to exit this thread.
-                    long sleepTime = Math.min(checkpointInterval, 500);
-                    while (opened.get()) {
-                        Thread.sleep(sleepTime);
-                        long now = System.currentTimeMillis();
-                        if( now - lastCleanup >= cleanupInterval ) {
-                            checkpointCleanup(true);
-                            lastCleanup = now;
-                            lastCheckpoint = now;
-                        } else if( now - lastCheckpoint >= checkpointInterval ) {
-                            checkpointCleanup(false);
-                            lastCheckpoint = now;
+        synchronized (checkpointThreadLock) {
+            boolean start = false;
+            if (checkpointThread == null) {
+                start = true;
+            } else if (!checkpointThread.isAlive()) {
+                start = true;
+                LOG.info("KahaDB: Recovering checkpoint thread after death");
+            }
+            if (start) {
+                checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+                    @Override
+                    public void run() {
+                        try {
+                            long lastCleanup = System.currentTimeMillis();
+                            long lastCheckpoint = System.currentTimeMillis();
+                            // Sleep for a short time so we can periodically check
+                            // to see if we need to exit this thread.
+                            long sleepTime = Math.min(checkpointInterval, 500);
+                            while (opened.get()) {
+                                Thread.sleep(sleepTime);
+                                long now = System.currentTimeMillis();
+                                if( now - lastCleanup >= cleanupInterval ) {
+                                    checkpointCleanup(true);
+                                    lastCleanup = now;
+                                    lastCheckpoint = now;
+                                } else if( now - lastCheckpoint >= checkpointInterval
) {
+                                    checkpointCleanup(false);
+                                    lastCheckpoint = now;
+                                }
+                            }
+                        } catch (InterruptedException e) {
+                            // Looks like someone really wants us to exit this thread...
+                        } catch (IOException ioe) {
+                            LOG.error("Checkpoint failed", ioe);
+                            brokerService.handleIOException(ioe);
                         }
                     }
-                } catch (InterruptedException e) {
-                    // Looks like someone really wants us to exit this thread...
-                } catch (IOException ioe) {
-                    LOG.error("Checkpoint failed", ioe);
-                    brokerService.handleIOException(ioe);
-                }
+                };
+
+                checkpointThread.setDaemon(true);
+                checkpointThread.start();
             }
-                    
-        };
-        checkpointThread.setDaemon(true);
-        checkpointThread.start();
+        }
 	}
 
 	public void open() throws IOException {
@@ -378,7 +390,9 @@ public class MessageDatabase extends Ser
 	            this.indexLock.writeLock().unlock();
 	        }
 	        journal.close();
-	        checkpointThread.join();
+            synchronized (checkpointThreadLock) {
+	            checkpointThread.join();
+            }
 	        lockFile.unlock();
 	        lockFile=null;
 		}
@@ -761,7 +775,6 @@ public class MessageDatabase extends Ser
                 this.indexLock.writeLock().unlock();
             }
             if (!checkpointThread.isAlive()) {
-                LOG.info("KahaDB: Recovering checkpoint thread after exception");
                 startCheckpoint();
             }
             if (after != null) {



Mime
View raw message