activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1026373 - in /activemq/activemq-apollo/trunk: apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/ apollo-util/src/main/scala/org/apache/activemq/apollo/util/
Date Fri, 22 Oct 2010 15:26:43 GMT
Author: chirino
Date: Fri Oct 22 15:26:42 2010
New Revision: 1026373

URL: http://svn.apache.org/viewvc?rev=1026373&view=rev
Log:
More robust store start/stop logic.

Modified:
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Logging.scala

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala?rev=1026373&r1=1026372&r2=1026373&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala
Fri Oct 22 15:26:42 2010
@@ -138,7 +138,6 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     }
   }
 
-  val schedule_version = new AtomicInteger()
 
   def start(onComplete:Runnable) = {
     lock {
@@ -207,15 +206,10 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       }
 
       recover(onComplete)
-
-      // Schedule periodic jobs.. they keep executing while schedule_version remains the
same.
-      scheduleCleanup(schedule_version.get())
-      scheduleFlush(schedule_version.get())
     }
   }
 
   def stop() = {
-    schedule_version.incrementAndGet
     journal.close
     indexFileFactory.close
     lockFile.unlock
@@ -912,17 +906,6 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   //
   /////////////////////////////////////////////////////////////////////
 
-  def scheduleFlush(version:Int): Unit = {
-    def try_flush() = {
-      if (version == schedule_version.get) {
-        hawtDBStore.executor_pool {
-          flush
-          scheduleFlush(version)
-        }
-      }
-    }
-    dispatchQueue.dispatchAfter(config.index_flush_interval, TimeUnit.MILLISECONDS, ^ {try_flush})
-  }
 
   def flush() = {
     val start = System.currentTimeMillis()
@@ -933,25 +916,15 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     }
   }
 
-  def scheduleCleanup(version:Int): Unit = {
-    def try_cleanup() = {
-      if (version == schedule_version.get) {
-        hawtDBStore.executor_pool {
-          withTx {tx =>
-            cleanup(tx)
-          }
-          scheduleCleanup(version)
-        }
-      }
-    }
-    dispatchQueue.dispatchAfter(config.cleanup_interval, TimeUnit.MILLISECONDS, ^ {try_cleanup})
+  def cleanup():Unit = withTx {tx =>
+    cleanup(tx)
   }
 
   /**
    * @param tx
    * @throws IOException
    */
-  def cleanup(tx:Transaction) = {
+  def cleanup(tx:Transaction):Unit = {
     val helper = new TxHelper(tx)
     import JavaConversions._
     import helper._

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala?rev=1026373&r1=1026372&r2=1026373&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala
Fri Oct 22 15:26:42 2010
@@ -16,13 +16,13 @@
  */
 package org.apache.activemq.apollo.store.hawtdb
 
-import java.util.concurrent.atomic.AtomicLong
 import collection.mutable.ListBuffer
 import java.util.HashMap
 import collection.{Seq}
 import org.fusesource.hawtdispatch.ScalaDispatch._
 import java.io.File
 import java.util.concurrent._
+import atomic.{AtomicInteger, AtomicLong}
 import org.apache.activemq.apollo.dto._
 import org.apache.activemq.apollo.store._
 import org.apache.activemq.apollo.util._
@@ -65,9 +65,13 @@ class HawtDBStore extends DelayingStoreS
   var next_msg_key = new AtomicLong(1)
 
   var executor_pool:ExecutorService = _
+  val schedule_version = new AtomicInteger()
   var config:HawtDBStoreDTO = defaultConfig
   val client = new HawtDBClient(this)
 
+  val load_source = createSource(new ListEventAggregator[(Long, (Option[MessageRecord])=>Unit)](),
dispatchQueue)
+  load_source.setEventHandler(^{drain_loads});
+
   override def toString = "hawtdb store"
 
   def flush_delay = config.flush_delay
@@ -111,14 +115,44 @@ class HawtDBStore extends DelayingStoreS
       client.start(^{
         next_msg_key.set( client.rootBuffer.getLastMessageKey.longValue +1 )
         next_queue_key.set( client.rootBuffer.getLastQueueKey.longValue +1 )
+        val v = schedule_version.incrementAndGet
+        scheduleCleanup(v)
+        scheduleFlush(v)
+        load_source.resume
         onCompleted.run
       })
     }
   }
 
+  def scheduleFlush(version:Int): Unit = {
+    def try_flush() = {
+      if (version == schedule_version.get) {
+        executor_pool {
+          client.flush
+          scheduleFlush(version)
+        }
+      }
+    }
+    dispatchQueue.dispatchAfter(config.index_flush_interval, TimeUnit.MILLISECONDS, ^ {try_flush})
+  }
+
+  def scheduleCleanup(version:Int): Unit = {
+    def try_cleanup() = {
+      if (version == schedule_version.get) {
+        executor_pool {
+          client.cleanup()
+          scheduleCleanup(version)
+        }
+      }
+    }
+    dispatchQueue.dispatchAfter(config.cleanup_interval, TimeUnit.MILLISECONDS, ^ {try_cleanup})
+  }
+
   protected def _stop(onCompleted: Runnable) = {
+    schedule_version.incrementAndGet
     new Thread() {
       override def run = {
+        load_source.suspend
         executor_pool.shutdown
         executor_pool.awaitTermination(86400, TimeUnit.SECONDS)
         executor_pool = null
@@ -181,11 +215,6 @@ class HawtDBStore extends DelayingStoreS
     }
   }
 
-  val load_source = createSource(new ListEventAggregator[(Long, (Option[MessageRecord])=>Unit)](),
dispatchQueue)
-  load_source.setEventHandler(^{drain_loads});
-  load_source.resume
-
-
   def loadMessage(messageKey: Long)(callback: (Option[MessageRecord]) => Unit) = {
     message_load_latency_counter.start { end=>
       load_source.merge((messageKey, { (result)=>

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Logging.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Logging.scala?rev=1026373&r1=1026372&r2=1026373&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Logging.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Logging.scala
Fri Oct 22 15:26:42 2010
@@ -233,7 +233,7 @@ trait DispatchLogging extends Logging {
 
   override protected def log_map(message:String) = {
     val d = getCurrentQueue
-    if( d!=null ) {
+    if( d!=null && d.getLabe!=null ) {
       d.getLabel+" | "+message
     } else {
       message



Mime
View raw message