activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1234067 - in /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb: dto/LevelDBStoreDTO.java leveldb/LevelDBClient.scala
Date Fri, 20 Jan 2012 18:52:46 GMT
Author: chirino
Date: Fri Jan 20 18:52:46 2012
New Revision: 1234067

URL: http://svn.apache.org/viewvc?rev=1234067&view=rev
Log:
Fixes APLO-134 : LevelDB store should use a file lock so a broker instance get exclusive access
to the store data.

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java?rev=1234067&r1=1234066&r2=1234067&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
Fri Jan 20 18:52:46 2012
@@ -42,6 +42,9 @@ public class LevelDBStoreDTO extends Sto
 
     @XmlAttribute(name="paranoid_checks")
     public Boolean paranoid_checks;
+    
+    @XmlAttribute(name="fail_if_locked")
+    public Boolean fail_if_locked;
 
     @XmlAttribute(name="verify_checksums")
     public Boolean verify_checksums;

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1234067&r1=1234066&r2=1234067&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Fri Jan 20 18:52:46 2012
@@ -282,12 +282,17 @@ class LevelDBClient(store: LevelDBStore)
       }
     }
 
+    lock_file = new LockFile(directory / "lock", true)
+
     def time[T](func: =>T):Long = {
       val start = System.nanoTime()
       func  
       System.nanoTime() - start 
     }
-    
+
+    // Lock before we open anything..
+    lock_store
+
     val log_open_duration = time {
       retry {
         log.open
@@ -505,7 +510,23 @@ class LevelDBClient(store: LevelDBStore)
     }
   }
 
-
+  var lock_file:LockFile = _
+  
+  def lock_store = {
+    import OptionSupport._
+    if (config.fail_if_locked.getOrElse(false)) {
+      lock_file.lock()
+    } else {
+      retry {
+        lock_file.lock()
+      }
+    }
+  }
+  
+  def unlock_store = {
+    lock_file.unlock()
+  }
+  
   private def store_log_refs = {
     index.put(log_refs_index_key, JsonCodec.encode(collection.JavaConversions.mapAsJavaMap(log_refs.mapValues(_.get()))).toByteArray)
   }
@@ -530,6 +551,7 @@ class LevelDBClient(store: LevelDBStore)
     }
     copy_dirty_index_to_snapshot
     log = null
+    unlock_store
   }
 
   def using_index[T](func: =>T):T = {
@@ -639,7 +661,7 @@ class LevelDBClient(store: LevelDBStore)
 
       if (!rc.isDefined) {
         // We may need to give up if the store is being stopped.
-        if ( !store.service_state.is_started ) {
+        if ( !store.service_state.is_starting_or_started ) {
           throw error
         }
         Thread.sleep(1000)



Mime
View raw message