activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject [05/14] git commit: Have the stomp protocol keep better track of in progress commits to aid in debugging.
Date Fri, 11 Oct 2013 19:14:09 GMT
Have the stomp protocol keep better track of in progress commits to aid in debugging.

git-svn-id: https://svn.apache.org/repos/asf/activemq/activemq-apollo/trunk@1508925 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/activemq-apollo/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-apollo/commit/29c60eab
Tree: http://git-wip-us.apache.org/repos/asf/activemq-apollo/tree/29c60eab
Diff: http://git-wip-us.apache.org/repos/asf/activemq-apollo/diff/29c60eab

Branch: refs/heads/trunk
Commit: 29c60eab89386875570d3805223fca0fad93d3e3
Parents: c96242e
Author: Hiram R. Chirino <chirino@apache.org>
Authored: Wed Jul 31 15:30:59 2013 +0000
Committer: Hiram R. Chirino <chirino@apache.org>
Committed: Wed Jul 31 15:30:59 2013 +0000

----------------------------------------------------------------------
 .../apollo/stomp/StompProtocolHandler.scala     | 58 ++++++++++++--------
 1 file changed, 36 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/29c60eab/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
----------------------------------------------------------------------
diff --git a/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
b/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
index 43f6ab8..9bd8c15 100644
--- a/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
+++ b/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
@@ -1212,9 +1212,14 @@ class StompProtocolHandler extends ProtocolHandler {
           case None=>
             perform_send(frame)
           case Some(txid)=>
-            get_or_create_tx_queue(txid).add { uow=>
-              perform_send(frame, uow)
-            }
+            get_or_create_tx_queue(txid).add (new TransactionAction(){
+              override def on_commit(uow: StoreUOW) {
+                perform_send(frame, uow)
+              }
+              override def toString: String = {
+                "send to: "+dest
+              }
+            })
         }
 
     }
@@ -1715,11 +1720,17 @@ class StompProtocolHandler extends ProtocolHandler {
             handler.perform_ack(consumed, messageId, null)
           case Some(txid)=>
             handler.consumer.retain()
-            get_or_create_tx_queue(txid).add({ uow=>
-              handler.perform_ack(consumed, messageId, uow)
-              handler.consumer.release()
-            }, ()=>{
-              handler.consumer.release()
+            get_or_create_tx_queue(txid).add (new TransactionAction(){
+              override def on_commit(uow: StoreUOW) {
+                handler.perform_ack(consumed, messageId, uow)
+                handler.consumer.release()
+              }
+              override def on_rollback() {
+                handler.consumer.release()
+              }
+              override def toString: String = {
+                "ack: "+messageId
+              }
             })
         }
       }
@@ -1737,7 +1748,10 @@ class StompProtocolHandler extends ProtocolHandler {
   }
 
   def on_stomp_commit(headers:HeaderMap) = {
-    remove_tx_queue(require_transaction_header(headers)).commit {
+    val txid = require_transaction_header(headers)
+    val tx = transactions.get(txid).getOrElse(die("transaction not active: %d".format(txid)))
+    tx.commit {
+      remove_tx_queue(txid)
       send_receipt(headers)
     }
   }
@@ -1767,25 +1781,29 @@ class StompProtocolHandler extends ProtocolHandler {
     frame
   }
 
+  class TransactionAction {
+    def on_commit(uow:StoreUOW):Unit = {}
+    def on_rollback() = {}
+  }
 
   class TransactionQueue {
     // TODO: eventually we want to back this /w a broker Queue which
     // can provides persistence and memory swapping.
 
-    val queue = ListBuffer[((StoreUOW)=>Unit, ()=>Unit)]()
-
+    val queue = ListBuffer[TransactionAction]()
+    var uow:StoreUOW = _
 
     override def toString: String = {
-      "{ actions: "+queue.size+" }"
+      "{ uow: "+uow+", actions: "+queue+" }"
     }
 
-    def add(on_commit:(StoreUOW)=>Unit, on_rollback:()=>Unit=null):Unit = {
-      queue += ((on_commit, on_rollback))
+    def add(action:TransactionAction):Unit = {
+      queue += action
     }
 
     def commit(on_complete: => Unit) = {
       if( host.store!=null ) {
-        val uow = host.store.create_uow
+        uow = host.store.create_uow
 //        println("UOW starting: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
         uow.on_complete {
 //          println("UOW completed: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
@@ -1793,20 +1811,16 @@ class StompProtocolHandler extends ProtocolHandler {
             on_complete
           }
         }
-        queue.foreach{ _._1(uow) }
+        queue.foreach{ _.on_commit(uow) }
         uow.release
       } else {
-        queue.foreach{ _._1(null) }
+        queue.foreach{ _.on_commit(null) }
         on_complete
       }
     }
 
     def rollback = {
-      queue.foreach{ case (x, y) =>
-        if( y != null ) {
-          y()
-        }
-      }
+      queue.foreach{ _.on_rollback() }
     }
 
   }


Mime
View raw message