activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1421345 - in /activemq/activemq-apollo/trunk/apollo-amqp/src: main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala
Date Thu, 13 Dec 2012 15:43:36 GMT
Author: chirino
Date: Thu Dec 13 15:43:35 2012
New Revision: 1421345

URL: http://svn.apache.org/viewvc?rev=1421345&view=rev
Log:
Improve the transaction handling.

Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1421345&r1=1421344&r2=1421345&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
Thu Dec 13 15:43:35 2012
@@ -862,15 +862,15 @@ class AmqpProtocolHandler extends Protoc
         case state:TransactionalState =>
           transactions.get(toLong(state.getTxnId())) match {
             case Some(tx) =>
-              tx.add({ uow =>
+              tx.add((uow)=>{
                 d.uow = uow
                 val accepted = producer_overflow.offer(d)
                 assert(accepted)
-                receiver.advance();
               })
             case None =>
               die("uknown-tx", "txid in the delivery remote state is invalid")
           }
+          receiver.advance();
         case _ =>
           val accepted = producer_overflow.offer(d)
           assert(accepted)
@@ -1109,9 +1109,11 @@ class AmqpProtocolHandler extends Protoc
         case state:TransactionalState =>
           transactions.get(toLong(state.getTxnId())) match {
             case Some(tx) =>
-              tx.add({ uow =>
+              tx.add((uow)=>{
                   process(proton_delivery, state.getOutcome, uow)
-              })
+              }
+//                , ()=>{ settle(proton_delivery, null, true, null) }
+              )
             case None =>
               die("uknown-tx", "txid in the delivery remote state is invalid")
           }
@@ -1324,12 +1326,10 @@ class AmqpProtocolHandler extends Protoc
 
           val tx_queue = remove_tx_queue(txid);
           if (discharge.getFail()) {
-            System.out.println("rollback transaction " + txid);
             tx_queue.rollback
             delivery.settle();
             pump_out
           } else {
-            System.out.println("commit transaction " + txid);
             tx_queue.commit {
               queue {
                 delivery.settle();

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala?rev=1421345&r1=1421344&r2=1421345&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala
Thu Dec 13 15:43:35 2012
@@ -65,6 +65,35 @@ class QpidJmsTest extends AmqpTestSuppor
     return connection
   }
 
+  test("Transaction Test") {
+
+    val default_connection = createConnection()
+    val destination = new QueueImpl("queue://txtest")
+    val session = default_connection.createSession(true, Session.SESSION_TRANSACTED)
+    var consumer = session.createConsumer(destination)
+    var producer = session.createProducer(destination)
+
+    var msg = session.createTextMessage("1")
+    producer.send(msg)
+    session.commit()
+
+    msg.setText("2")
+    producer.send(msg)
+    session.rollback()
+
+    msg.setText("3")
+    producer.send(msg)
+    session.commit()
+
+    receive_text(consumer) should equal("1")
+    session.commit()
+    receive_text(consumer) should equal("3")
+    session.rollback()
+    receive_text(consumer) should equal("3")
+    session.commit()
+
+  }
+
   test("NoLocal Test") {
 
     val default_connection = createConnection("clientid")



Mime
View raw message