activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r741597 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Date Fri, 06 Feb 2009 15:42:22 GMT
Author: chirino
Date: Fri Feb  6 15:42:22 2009
New Revision: 741597

URL: http://svn.apache.org/viewvc?rev=741597&view=rev
Log:
better synchronization of the metadata.lastUpdate var

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=741597&r1=741596&r2=741597&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Fri Feb  6 15:42:22 2009
@@ -356,44 +356,48 @@
      * @throws IllegalStateException
      */
     private void recover() throws IllegalStateException, IOException {
-        long start = System.currentTimeMillis();
-        
-        Location recoveryPosition = getRecoveryPosition();
-        if( recoveryPosition ==null ) {
-        	return;
-        }
-        
-        int redoCounter = 0;
-        LOG.info("Journal Recovery Started from: " + journal + " at " + recoveryPosition.getDataFileId()
+ ":" + recoveryPosition.getOffset());
-
-        while (recoveryPosition != null) {
-            JournalCommand message = load(recoveryPosition);
-            metadata.lastUpdate = recoveryPosition;
-            process(message, recoveryPosition);
-            redoCounter++;
-            recoveryPosition = journal.getNextLocation(recoveryPosition);
+        synchronized (indexMutex) {
+	        long start = System.currentTimeMillis();
+	        
+	        Location recoveryPosition = getRecoveryPosition();
+	        if( recoveryPosition ==null ) {
+	        	return;
+	        }
+	        
+	        int redoCounter = 0;
+	        LOG.info("Journal Recovery Started from: " + journal + " at " + recoveryPosition.getDataFileId()
+ ":" + recoveryPosition.getOffset());
+	
+	        while (recoveryPosition != null) {
+	            JournalCommand message = load(recoveryPosition);
+	            metadata.lastUpdate = recoveryPosition;
+	            process(message, recoveryPosition);
+	            redoCounter++;
+	            recoveryPosition = journal.getNextLocation(recoveryPosition);
+	        }
+	        long end = System.currentTimeMillis();
+	        LOG.info("Replayed " + redoCounter + " operations from redo log in " + ((end - start)
/ 1000.0f) + " seconds.");
         }
-        long end = System.currentTimeMillis();
-        LOG.info("Replayed " + redoCounter + " operations from redo log in " + ((end - start)
/ 1000.0f) + " seconds.");
     }
     
 	private Location nextRecoveryPosition;
 	private Location lastRecoveryPosition;
 
 	public void incrementalRecover() throws IOException {
-        if( nextRecoveryPosition == null ) {
-        	if( lastRecoveryPosition==null ) {
-        		nextRecoveryPosition = getRecoveryPosition();
-        	} else {
-                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
-        	}        	
-        }
-        while (nextRecoveryPosition != null) {
-        	lastRecoveryPosition = nextRecoveryPosition;
-            metadata.lastUpdate = lastRecoveryPosition;
-            JournalCommand message = load(lastRecoveryPosition);
-            process(message, lastRecoveryPosition);            
-            nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
+        synchronized (indexMutex) {
+	        if( nextRecoveryPosition == null ) {
+	        	if( lastRecoveryPosition==null ) {
+	        		nextRecoveryPosition = getRecoveryPosition();
+	        	} else {
+	                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
+	        	}        	
+	        }
+	        while (nextRecoveryPosition != null) {
+	        	lastRecoveryPosition = nextRecoveryPosition;
+	            metadata.lastUpdate = lastRecoveryPosition;
+	            JournalCommand message = load(lastRecoveryPosition);
+	            process(message, lastRecoveryPosition);            
+	            nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
+	        }
         }
 	}
 	
@@ -482,7 +486,9 @@
     		LOG.warn("KahaDB long enqueue time: Journal Add Took: "+(start2-start)+" ms, Index
Update took "+(end-start2)+" ms");
     	}
 
-        metadata.lastUpdate = location;
+        synchronized (indexMutex) {
+        	metadata.lastUpdate = location;
+        }
         return location;
     }
 



Mime
View raw message