activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1390551 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala
Date Wed, 26 Sep 2012 15:15:16 GMT
Author: chirino
Date: Wed Sep 26 15:15:15 2012
New Revision: 1390551

URL: http://svn.apache.org/viewvc?rev=1390551&view=rev
Log:
Errors could occur in the store due to an invalid double dequeue from the store.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
    activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1390551&r1=1390550&r2=1390551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
Wed Sep 26 15:15:15 2012
@@ -197,23 +197,26 @@ class QueueEntry(val queue:Queue, val se
   def load(space:MemorySpace) = state.swap_in(space)
   def remove = state.remove
 
-  def dequeue(uow: StoreUOW) = {
+  var queued = true
 
-    if (messageKey != -1) {
-      val storeBatch = if( uow == null ) {
-        queue.virtual_host.store.create_uow
-      } else {
-        uow
-      }
-      storeBatch.dequeue(toQueueEntryRecord)
-      if( uow == null ) {
-        storeBatch.release
+  def dequeue(uow: StoreUOW) = {
+    if ( queued ) {
+      if (messageKey != -1) {
+        val storeBatch = if( uow == null ) {
+          queue.virtual_host.store.create_uow
+        } else {
+          uow
+        }
+        storeBatch.dequeue(toQueueEntryRecord)
+        if( uow == null ) {
+          storeBatch.release
+        }
       }
+      queue.dequeue_item_counter += 1
+      queue.dequeue_size_counter += size
+      queue.dequeue_ts = queue.now
+      queued = false
     }
-
-    queue.dequeue_item_counter += 1
-    queue.dequeue_size_counter += size
-    queue.dequeue_ts = queue.now
   }
 
 
@@ -668,11 +671,6 @@ class QueueEntry(val queue:Queue, val se
 
         // We can drop after dispatch in some cases.
         if( queue.is_topic_queue  && parked.isEmpty && getPrevious.is_head
) {
-          if (messageKey != -1) {
-            val storeBatch = queue.virtual_host.store.create_uow
-            storeBatch.dequeue(toQueueEntryRecord)
-            storeBatch.release
-          }
           dequeue(null)
           remove
         }

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala?rev=1390551&r1=1390550&r2=1390551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala
Wed Sep 26 15:15:15 2012
@@ -53,7 +53,7 @@ class MqttLoadTest extends MqttTestSuppo
   }
 }
 
-// This test is failing with: java.lang.AssertionError: assertion failed: locator_based.unary_$bang.$bar$bar(uow.have_locators)
-//class MqttLoadLevelDBTest extends MqttLoadTest {
-//  override def broker_config_uri = "xml:classpath:apollo-mqtt-leveldb.xml"
-//}
+//This test is failing with: java.lang.AssertionError: assertion failed: locator_based.unary_$bang.$bar$bar(uow.have_locators)
+class MqttLoadLevelDBTest extends MqttLoadTest {
+  override def broker_config_uri = "xml:classpath:apollo-mqtt-leveldb.xml"
+}



Mime
View raw message