activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1486242 - in /activemq/trunk/activemq-leveldb-store/src/main: java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java scala/org/apache/activemq/leveldb/DBManager.scala scala/org/apache/activemq/leveldb/LevelDBStore.scala
Date Fri, 24 May 2013 22:33:32 GMT
Author: chirino
Date: Fri May 24 22:33:32 2013
New Revision: 1486242

URL: http://svn.apache.org/r1486242
Log:
This should fixed the problem with the delayed leveldb index updates.

Modified:
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala

Modified: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java?rev=1486242&r1=1486241&r2=1486242&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
Fri May 24 22:33:32 2013
@@ -108,7 +108,4 @@ public interface LevelDBStoreViewMBean {
     @MBeanInfo("Compacts disk usage")
     void compact();
 
-    @MBeanInfo("Are delayed index updates occurring?")
-    boolean getDelayedIndexUpdates();
-
 }

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1486242&r1=1486241&r2=1486242&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
Fri May 24 22:33:32 2013
@@ -331,7 +331,6 @@ class DelayableUOW(val manager:DBManager
       val s = size
       if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) {
         asyncCapacityUsed = s
-        countDownFuture.set(null)
         manager.parent.blocking_executor.execute(^{
           complete_listeners.foreach(_())
         })
@@ -353,11 +352,11 @@ class DelayableUOW(val manager:DBManager
         asyncCapacityUsed = 0
       } else {
         manager.uow_complete_latency.add(System.nanoTime() - disposed_at)
-        countDownFuture.set(null)
         manager.parent.blocking_executor.execute(^{
           complete_listeners.foreach(_())
         })
       }
+      countDownFuture.set(null)
 
       for( (id, action) <- actions ) {
         if( !action.enqueues.isEmpty ) {

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1486242&r1=1486241&r2=1486242&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
Fri May 24 22:33:32 2013
@@ -98,7 +98,6 @@ class LevelDBStoreView(val store:LevelDB
   def getParanoidChecks = paranoidChecks
   def getSync = sync
   def getVerifyChecksums = verifyChecksums
-  def getDelayedIndexUpdates = delayedIndexUpdates
 
   def getUowClosedCounter = db.uowClosedCounter
   def getUowCanceledCounter = db.uowCanceledCounter
@@ -184,7 +183,6 @@ class LevelDBStore extends LockableServi
   val topics = collection.mutable.HashMap[ActiveMQTopic, LevelDBStore#LevelDBTopicMessageStore]()
   val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]()
   val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]()
-  var delayedIndexUpdates = false
 
   def init() = {}
 
@@ -708,35 +706,9 @@ class LevelDBStore extends LockableServi
     }
 
     def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit =
{
-      var found = false
-      var counter = 0;
-      while( !found ) {
         val limiting = LimitingRecoveryListener(maxReturned, listener)
         val excluding = PreparedExcluding(limiting)
         cursorPosition = db.cursorMessages(key, excluding, cursorPosition)
-        if( limiting.recovered > 0 ) {
-          if( !delayedIndexUpdates && counter>0 ) {
-            info("This machine seems to have delayed index updates.")
-            delayedIndexUpdates = true
-          }
-          found = true
-        } else {
-          // Seems like on some systems it takes a while for leveldb index updates
-          // to become visible for read.  Need to figure out why this is, but until
-          // then, lets loop until we can read it.
-          if( counter > 10 ) {
-            found = true
-          } else {
-            counter+=1
-            // lets try to sync up /w the write thread..
-            val t = new CountDownLatch(1)
-            client.writeExecutorExec {
-              t.countDown()
-            }
-            t.await()
-          }
-        }
-      }
     }
 
     override def setBatch(id: MessageId): Unit = {



Mime
View raw message