activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1416616 - in /activemq/activemq-apollo/trunk: ./ apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-amqp/src/test/resources/ apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/ apollo-amqp/src/test/scala/org/a...
Date Mon, 03 Dec 2012 18:19:14 GMT
Author: chirino
Date: Mon Dec  3 18:19:11 2012
New Revision: 1416616

URL: http://svn.apache.org/viewvc?rev=1416616&view=rev
Log:
Implemented AMQP support for: Transactions, Durable Subscriptions, and Selectors.  Passes
JORAM tests when run against trunk version of the AMQP JMS client.

Added:
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/test.properties
Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/JoramJmsTest.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-scala/pom.xml
    activemq/activemq-apollo/trunk/pom.xml

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1416616&r1=1416615&r2=1416616&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
Mon Dec  3 18:19:11 2012
@@ -30,25 +30,27 @@ import path.LiteralPart
 import protocol.ProtocolHandler
 import org.apache.activemq.apollo.broker.security.SecurityContext
 import org.apache.activemq.apollo.amqp.dto._
-import org.apache.qpid.proton.engine
-import org.apache.qpid.proton.engine.impl.{ProtocolTracer, DeliveryImpl, LinkImpl, TransportImpl}
-import org.apache.qpid.proton.engine._
 import org.fusesource.hawtbuf.Buffer._
-import org.apache.qpid.proton.`type`.transaction.{TransactionalState, Coordinator}
-import org.apache.qpid.proton.`type`.messaging.{Accepted, Data, Source, Target, Modified}
 import org.apache.activemq.apollo.broker.Delivery
 import org.apache.activemq.apollo.filter.{FilterException, BooleanExpression}
-import org.apache.qpid.proton.`type`.{Symbol => AmqpSymbol, Binary, DescribedType}
 import org.apache.activemq.apollo.selector.SelectorParser
-import org.apache.qpid.proton.`type`.transport.SenderSettleMode
 import java.util
 import java.io.IOException
-import scala.Some
+import org.apache.activemq.apollo.broker.Session
+import store.StoreUOW
 import org.apache.activemq.apollo.broker.FullSink
 import org.apache.activemq.apollo.broker.SubscriptionAddress
-import org.apache.activemq.apollo.broker.Session
+
+import org.apache.qpid.proton.engine
 import org.apache.qpid.proton.framing.TransportFrame
 import org.apache.qpid.proton.hawtdispatch.impl.{AmqpListener, AmqpTransport, AmqpProtocolCodec}
+import org.apache.qpid.proton.engine._
+import org.apache.qpid.proton.engine.impl.{ProtocolTracer, DeliveryImpl, LinkImpl, TransportImpl}
+import org.apache.qpid.proton.{`type` => proton_type}
+import proton_type.{Symbol => AmqpSymbol, UnsignedInteger, Binary, DescribedType}
+import proton_type.transport.SenderSettleMode
+import proton_type.messaging._
+import proton_type.transaction._
 
 object AmqpProtocolHandler extends Log {
 
@@ -68,7 +70,12 @@ object AmqpProtocolHandler extends Log {
   DEFAULT_DETINATION_PARSER.any_child_wildcard = "*"
   DEFAULT_DETINATION_PARSER.any_descendant_wildcard = "**"
 
+  val COPY = org.apache.qpid.proton.`type`.Symbol.getSymbol("copy");
+
   val JMS_SELECTOR = AmqpSymbol.valueOf("jms-selector")
+  val NO_LOCAL = AmqpSymbol.valueOf("no-local");
+  val DURABLE = new UnsignedInteger(2);
+
   val EMPTY_BYTE_ARRAY = Array[Byte]()
 
   def toBytes(value: Long): Array[Byte] = {
@@ -177,7 +184,7 @@ class AmqpProtocolHandler extends Protoc
   }
 
   var amqp_connection:AmqpTransport = _
-  var amqp_trace = true
+  var amqp_trace = false
 
   def codec = connection.transport.getProtocolCodec.asInstanceOf[AmqpProtocolCodec]
 
@@ -353,13 +360,11 @@ class AmqpProtocolHandler extends Protoc
 
       receiver.getRemoteTarget() match {
         case target: Coordinator =>
-          //          pumpProtonToSocket();
-          //          receiver.setContext(coordinatorContext);
-          //          receiver.flow(1024 * 64);
-          //          receiver.open();
-          //          pumpProtonToSocket();
-          close_with_error(receiver, "txs not supported")
+          set_attachment(receiver, coordinatorContext)
+          receiver.flow(prefetch);
+          receiver.open();
           onComplete.run()
+
         case amqp_target: Target =>
 
           val (address, addresses, actualTarget) = decode_target(amqp_target)
@@ -419,9 +424,6 @@ class AmqpProtocolHandler extends Protoc
 
     def processReceiverClose(receiver: Receiver, onComplete: Task) {
       get_attachment(receiver) match {
-        case null =>
-          receiver.close()
-          onComplete.run()
         case route: AmqpProducerRoute =>
           // Lets disconnect the route.
           set_attachment(receiver, null)
@@ -434,17 +436,19 @@ class AmqpProtocolHandler extends Protoc
               onComplete.run()
             }
           }
+        case _ =>
+          receiver.close()
+          onComplete.run()
       }
     }
 
     override def processDelivery(delivery: engine.Delivery) {
       get_attachment(delivery.getLink) match {
         case null =>
-        case route: AmqpProducerRoute =>
-          route.process(delivery.asInstanceOf[DeliveryImpl])
+        case producer: ProducerSupport =>
+          producer.process(delivery.asInstanceOf[DeliveryImpl])
         case consumer: AmqpConsumer =>
           consumer.process(delivery.asInstanceOf[DeliveryImpl])
-        // TODO
       }
     }
 
@@ -453,7 +457,16 @@ class AmqpProtocolHandler extends Protoc
       sender.setSource(sender.getRemoteSource());
       sender.setTarget(sender.getRemoteTarget());
 
-      val source = sender.getRemoteSource().asInstanceOf[Source]
+      var source = sender.getRemoteSource().asInstanceOf[Source]
+      if( source == null ) {
+        // Source get set to null when a durable sub is being ended.
+        source = new org.apache.qpid.proton.`type`.messaging.Source();
+        source.setAddress("/dsub/"+sender.getName);
+        source.setDurable(DURABLE)
+        source.setExpiryPolicy(TerminusExpiryPolicy.NEVER)
+        sender.setSource(source);
+      }
+
       val (address, requested_addresses, actual) = decode_source(source)
       sender.setSource(actual);
       if (requested_addresses == null) {
@@ -465,7 +478,12 @@ class AmqpProtocolHandler extends Protoc
 
       val filter = source.getFilter()
       val selector = if (filter != null) {
-        val value = filter.get(JMS_SELECTOR).asInstanceOf[DescribedType]
+        var value = filter.get(NO_LOCAL).asInstanceOf[DescribedType];
+        if( value!=null ) {
+          // TODO: setup a no-local filter.
+//            consumerInfo.setNoLocal(true);
+        }
+        value = filter.get(JMS_SELECTOR).asInstanceOf[DescribedType]
         if (value != null) {
           val selector = value.getDescribed().toString()
           try {
@@ -486,7 +504,7 @@ class AmqpProtocolHandler extends Protoc
 
       val presettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
 
-      var browser = false
+      var browser = source.getDistributionMode() == COPY
       var browser_end = browser && true
       var exclusive = !browser && false
       var include_seq: Option[Long] = None
@@ -505,7 +523,7 @@ class AmqpProtocolHandler extends Protoc
         return
       }
 
-      var persistent = source.getDurable != null && source.getDurable.intValue()
== 1
+      val persistent = DURABLE == source.getDurable() && source.getExpiryPolicy ==
TerminusExpiryPolicy.NEVER
       val addresses: Array[_ <: BindAddress] = if (persistent) {
         val dsubs = ListBuffer[BindAddress]()
         val topics = ListBuffer[BindAddress]()
@@ -523,7 +541,9 @@ class AmqpProtocolHandler extends Protoc
         }
         sender.getName()
         val s = if (selector == null) null else selector._1
-        dsubs += SubscriptionAddress(destination_parser.decode_path(sender.getName), s, topics.toArray)
+        if( !topics.isEmpty ) {
+          dsubs += SubscriptionAddress(destination_parser.decode_path(sender.getName), s,
topics.toArray)
+        }
         dsubs.toArray
       } else {
         requested_addresses
@@ -534,11 +554,11 @@ class AmqpProtocolHandler extends Protoc
 
       link_counter += 1
       val id = link_counter
-      val consumer = new AmqpConsumer(sender, id, requested_addresses, presettle, selector,
browser, exclusive, include_seq, from_seq, browser_end);
+      val consumer = new AmqpConsumer(sender, id, addresses, presettle, selector, browser,
exclusive, include_seq, from_seq, browser_end);
       consumers += (id -> consumer)
 
       host.dispatch_queue {
-        val rc = host.router.bind(requested_addresses, consumer, security_context)
+        val rc = host.router.bind(consumer.addresses, consumer, security_context)
         queue {
           rc match {
             case Some(reason) =>
@@ -597,12 +617,12 @@ class AmqpProtocolHandler extends Protoc
     if( !disconnected ) {
       disconnected = true
 
-//      // Rollback any in-progress transactions..
-//      for( (id, tx) <- transactions ) {
-//        tx.rollback
-//      }
-//      transactions.clear()
-//
+      // Rollback any in-progress transactions..
+      for( (id, tx) <- transactions ) {
+        tx.rollback
+      }
+      transactions.clear()
+
       for (producer <- producers.values) {
         val addresses = producer.addresses
         host.dispatch_queue {
@@ -651,44 +671,6 @@ class AmqpProtocolHandler extends Protoc
     }
   }
 
-  trait ProducerSupport {
-    var current = new ByteArrayOutputStream();
-
-    def receiver: Receiver
-
-    def process(delivery: DeliveryImpl): Unit = {
-      if (!delivery.isReadable()) {
-        trace("it was not readable!");
-        return;
-      }
-
-      if (current == null) {
-        current = new ByteArrayOutputStream();
-      }
-
-      var data = new Array[Byte](1024 * 4);
-      var done = false
-      while (!done) {
-        val count = receiver.recv(data, 0, data.length)
-        if (count > 0) {
-          current.write(data, 0, count);
-        } else {
-          if (count == 0) {
-            // Expecting more deliveries..
-            return;
-          }
-          done = true
-        }
-      }
-
-      val buffer = current.toBuffer();
-      current = null;
-      onMessage(delivery, new AmqpMessage(buffer));
-    }
-
-    def onMessage(delivery: DeliveryImpl, buffer: AmqpMessage): Unit
-  }
-
   def decode_target(target: Target) = {
     var dynamic = target.getDynamic()
     if (dynamic) {
@@ -735,6 +717,43 @@ class AmqpProtocolHandler extends Protoc
 
   var temp_dest_counter = 0L
 
+  trait ProducerSupport {
+    var current = new ByteArrayOutputStream();
+
+    def process(delivery: DeliveryImpl): Unit = {
+      val receiver = delivery.getLink.asInstanceOf[Receiver]
+      if (!delivery.isReadable()) {
+        trace("it was not readable!");
+        return;
+      }
+
+      if (current == null) {
+        current = new ByteArrayOutputStream();
+      }
+
+      var data = new Array[Byte](1024 * 4);
+      var done = false
+      while (!done) {
+        val count = receiver.recv(data, 0, data.length)
+        if (count > 0) {
+          current.write(data, 0, count);
+        } else {
+          if (count == 0) {
+            // Expecting more deliveries..
+            return;
+          }
+          done = true
+        }
+      }
+
+      val buffer = current.toBuffer();
+      current = null;
+      onMessage(receiver, delivery, new AmqpMessage(buffer));
+    }
+
+    def onMessage(receiver:Receiver, delivery: DeliveryImpl, buffer: AmqpMessage): Unit
+  }
+
   class AmqpProducerRoute(val id:Long, val receiver: Receiver, val addresses: Array[SimpleAddress])
extends DeliveryProducerRoute(host.router) with ProducerSupport {
 
     val key = addresses.toList
@@ -758,7 +777,7 @@ class AmqpProtocolHandler extends Protoc
       }
     }
 
-    def onMessage(delivery: DeliveryImpl, message: AmqpMessage) = {
+    def onMessage(receiver:Receiver, delivery: DeliveryImpl, message: AmqpMessage) = {
       val d = new Delivery
       d.message = message
       d.size = message.encoded.length
@@ -794,9 +813,24 @@ class AmqpProtocolHandler extends Protoc
         delivery.settle()
       }
 
-      val accepted = producer_overflow.offer(d)
-      assert(accepted)
-      receiver.advance();
+      delivery.getRemoteState() match {
+        case state:TransactionalState =>
+          transactions.get(toLong(state.getTxnId())) match {
+            case Some(tx) =>
+              tx.add({ uow =>
+                d.uow = uow
+                val accepted = producer_overflow.offer(d)
+                assert(accepted)
+                receiver.advance();
+              })
+            case None =>
+              die("uknown-tx", "txid in the delivery remote state is invalid")
+          }
+        case _ =>
+          val accepted = producer_overflow.offer(d)
+          assert(accepted)
+          receiver.advance();
+      }
     }
   }
 
@@ -820,10 +854,38 @@ class AmqpProtocolHandler extends Protoc
                      val include_seq: Option[Long],
                      val from_seq: Long,
                      override val close_on_drain: Boolean
-                            ) extends BaseRetained with DeliveryConsumer {
+                            ) extends Task with Retained with DeliveryConsumer {
 
     override def toString = "amqp subscription:" + sender.getName + ", remote address: "
+ security_context.remote_address
 
+
+    /// Retained interface...
+    val base_retained = new BaseRetained {
+      override def dispose() {
+        do_dispose()
+        super.dispose()
+      }
+    }
+
+    def printST(name:String) = {
+      AmqpProtocolHandler.synchronized {
+        val e = new Exception
+        println(sender.getName+":"+name+":"+retained())
+        println("  "+e.getStackTrace.drop(2).take(10).mkString("\n  "))
+        System.out.flush()
+      }
+    }
+
+    def release() = {
+//      printST("release")
+      base_retained.release()
+    }
+    def retain() = {
+//      printST("retain")
+      base_retained.retain()
+    }
+    def retained() = base_retained.retained()
+
     ///////////////////////////////////////////////////////////////////
     // DeliveryConsumer Interface..
     ///////////////////////////////////////////////////////////////////
@@ -865,8 +927,9 @@ class AmqpProtocolHandler extends Protoc
 
     def close = {
       consumers -= subscription_id
+      val drop = sender.getSource.asInstanceOf[Source].getExpiryPolicy != TerminusExpiryPolicy.NEVER
       host.dispatch_queue {
-        host.router.unbind(addresses, this, false , security_context)
+        host.router.unbind(addresses, this, drop , security_context)
         release()
       }
     }
@@ -894,6 +957,19 @@ class AmqpProtocolHandler extends Protoc
       }
     }
 
+    // As the Task attachment to the Sender, we are run
+    // every events are fired on the sender endpoint.
+    def run() = {
+      queue.assertExecuting()
+      // If the endpoint is active, and we have been drained of msgs, let the remote end
know about it.
+      if( sender.getLocalState == EndpointState.ACTIVE &&
+          sender.getRemoteState==EndpointState.ACTIVE &&
+          redeliveries.isEmpty &&
+          session_manager.overflowed_sessions.isEmpty ) {
+        sender.drained()
+      }
+    }
+
     val redeliveries = new util.LinkedList[(Session[Delivery], Delivery)]()
     val session_manager = new SessionSinkMux[Delivery](FullSink(), queue, Delivery, 100,
buffer_size) {
       override def time_stamp = broker.now
@@ -951,7 +1027,7 @@ class AmqpProtocolHandler extends Protoc
             pumpNeeded = true
             proton_delivery.setContext(value)
             if (presettle) {
-              settle(proton_delivery, Consumed, false);
+              settle(proton_delivery, Consumed, false, null);
             } else {
               sender.advance();
             }
@@ -973,26 +1049,42 @@ class AmqpProtocolHandler extends Protoc
       }
     }
 
-    def process(proton_delivery:DeliveryImpl) = {
+    def process(proton_delivery:DeliveryImpl):Unit = {
       val state = proton_delivery.getRemoteState();
       state match {
+        case outcome:proton_type.messaging.Outcome =>
+          process(proton_delivery, outcome, null)
+        case state:TransactionalState =>
+          transactions.get(toLong(state.getTxnId())) match {
+            case Some(tx) =>
+              tx.add({ uow =>
+                  process(proton_delivery, state.getOutcome, uow)
+              })
+            case None =>
+              die("uknown-tx", "txid in the delivery remote state is invalid")
+          }
+      }
+    }
+
+    def process(proton_delivery:DeliveryImpl, outcome:proton_type.messaging.Outcome, uow:StoreUOW):Unit
= {
+      outcome match {
         case null =>
           if( !proton_delivery.remotelySettled() ) {
               proton_delivery.disposition(new Accepted());
           }
-          settle(proton_delivery, Consumed, false);
+          settle(proton_delivery, Consumed, false, uow);
         case accepted:Accepted =>
           if( !proton_delivery.remotelySettled() ) {
               proton_delivery.disposition(new Accepted());
           }
-          settle(proton_delivery, Consumed, false);
-        case rejected:Rejected =>
+          settle(proton_delivery, Consumed, false, uow);
+        case rejected:proton_type.messaging.Rejected =>
           // re-deliver /w incremented delivery counter.
-          settle(proton_delivery, null, true);
-        case release:Released =>
+          settle(proton_delivery, null, true, uow);
+        case release:proton_type.messaging.Released =>
           // re-deliver && don't increment the counter.
-          settle(proton_delivery, null, false);
-        case modified:Modified =>
+          settle(proton_delivery, null, false, uow);
+        case modified:proton_type.messaging.Modified =>
           def b(v:java.lang.Boolean) = v!=null && v.booleanValue()
           var ackType = if(b(modified.getUndeliverableHere())) {
               // receiver does not want the message..
@@ -1002,11 +1094,11 @@ class AmqpProtocolHandler extends Protoc
             // Delivered ??
             null
           }
-          settle(proton_delivery, ackType, b(modified.getDeliveryFailed()));
+          settle(proton_delivery, ackType, b(modified.getDeliveryFailed()), uow);
       }
     }
 
-    def settle(delivery:DeliveryImpl, ackType:DeliveryResult, incrementRedelivery:Boolean):Unit
= {
+    def settle(delivery:DeliveryImpl, ackType:DeliveryResult, incrementRedelivery:Boolean,
uow:StoreUOW):Unit = {
       val ctx = delivery.getContext.asInstanceOf[(Session[Delivery], Delivery)]
       if( ctx==null ) {
         return
@@ -1024,23 +1116,14 @@ class AmqpProtocolHandler extends Protoc
       if( ackType == null ) {
         redeliveries.addFirst((session, apollo_delivery))
         session_manager.drain_overflow
+        delivery.settle()
       } else {
-
-        val remoteState = delivery.getRemoteState
-        if (remoteState != null && remoteState.isInstanceOf[TransactionalState])
{
-          val s: TransactionalState = remoteState.asInstanceOf[TransactionalState]
-          val txid = toLong(s.getTxnId)
-          async_die("txs-not-supported", "Transactions not yet supported")
-          return
-        }
-
         if( apollo_delivery.ack != null ) {
-          apollo_delivery.ack(ackType, null)
+          apollo_delivery.ack(ackType, uow)
         }
+        delivery.settle()
       }
-      delivery.settle()
       pump_out
-
     }
 
     class AmqpConsumerSession(p: DeliveryProducer) extends DeliverySession with SessionSinkFilter[Delivery]
{
@@ -1075,7 +1158,7 @@ class AmqpProtocolHandler extends Protoc
       }
     }
 
-    override def dispose() = queue {
+    def do_dispose() = queue {
       def reject(value:(Session[Delivery], Delivery), result:DeliveryResult) ={
         val (_, delivery) = value
         if( delivery.ack!=null ) {
@@ -1099,10 +1182,115 @@ class AmqpProtocolHandler extends Protoc
 
       sender.close()
       pump_out
-      super.dispose()
     }
 
   }
 
+
+  class TransactionQueue {
+    // TODO: eventually we want to back this /w a broker Queue which
+    // can provides persistence and memory swapping.
+
+    val queue = ListBuffer[((StoreUOW)=>Unit, ()=>Unit)]()
+
+    def add(on_commit:(StoreUOW)=>Unit, on_rollback:()=>Unit=null):Unit = {
+      queue += ((on_commit, on_rollback))
+    }
+
+    def commit(on_complete: => Unit) = {
+      if( host.store!=null ) {
+        val uow = host.store.create_uow
+//        println("UOW starting: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
+        uow.on_complete {
+//          println("UOW completed: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
+          on_complete
+        }
+        queue.foreach{ _._1(uow) }
+        uow.release
+      } else {
+        queue.foreach{ _._1(null) }
+        on_complete
+      }
+    }
+
+    def rollback = {
+      queue.foreach{ case (x, y) =>
+        if( y != null ) {
+          y()
+        }
+      }
+    }
+
+  }
+
+  val transactions = HashMap[Long, TransactionQueue]()
+
+  def create_tx_queue(txid:Long):TransactionQueue = {
+    if ( transactions.contains(txid) ) {
+      die("invalid-tx", "transaction allready started")
+    } else {
+      val queue = new TransactionQueue
+      transactions.put(txid, queue)
+      queue
+    }
+  }
+
+  def remove_tx_queue(txid:Long):TransactionQueue = {
+    transactions.remove(txid).getOrElse(die("invalid-tx", "transaction not active: %d".format(txid)))
+  }
+
+  var nextTransactionId = 0L;
+  object coordinatorContext extends ProducerSupport {
+
+    def onMessage(receiver: Receiver, delivery: DeliveryImpl, buffer: AmqpMessage) = {
+      val msg = buffer.decoded;
+      val action = msg.getBody().asInstanceOf[AmqpValue].getValue();
+      action match {
+        case declare: Declare =>
+          if (declare.getGlobalId() != null) {
+            throw new Exception("don't know how to handle a declare /w a set GlobalId");
+          }
+
+          val txid = nextTransactionId
+          nextTransactionId += 1
+
+          create_tx_queue(txid)
+
+          val declared = new Declared();
+          declared.setTxnId(new Binary(toBytes(txid)));
+          delivery.disposition(declared);
+          delivery.settle();
+
+        case discharge: Discharge =>
+          val txid = toLong(discharge.getTxnId());
+//                  ExceptionResponse er = (ExceptionResponse)response;
+//                  Rejected rejected = new Rejected();
+//                  ArrayList errors = new ArrayList();
+//                  errors.add(er.getException().getMessage());
+//                  rejected.setError(errors);
+//                  delivery.disposition(rejected);
+
+          val tx_queue = remove_tx_queue(txid);
+          if (discharge.getFail()) {
+            System.out.println("rollback transaction " + txid);
+            tx_queue.rollback
+            delivery.settle();
+            pump_out
+          } else {
+            System.out.println("commit transaction " + txid);
+            tx_queue.commit {
+              queue {
+                delivery.settle();
+                pump_out
+              }
+            }
+          }
+        case _ =>
+          throw new Exception("Expected coordinator message type: " + action.getClass());
+      }
+
+    }
+  }
+
 }
 

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/test.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/test.properties?rev=1416616&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/test.properties (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/test.properties Mon Dec
 3 18:19:11 2012
@@ -0,0 +1,20 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+# This config file is used by the joram jms tests.
+#
+timeout=5000
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/JoramJmsTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/JoramJmsTest.java?rev=1416616&r1=1416615&r2=1416616&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/JoramJmsTest.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/JoramJmsTest.java
Mon Dec  3 18:19:11 2012
@@ -53,35 +53,28 @@ public class JoramJmsTest extends TestCa
 
         TestSuite suite = new TestSuite();
 
-        // TODO: Fix these tests..
-        if (false) {
-            // Fails due to durable subs not being implemented.
-            suite.addTestSuite(TopicSessionTest.class);
-            // Fails due to https://issues.apache.org/jira/browse/PROTON-110 and DestinationImpl
vs QueueImpl mapping issues
+        // TODO: enable once QPID-4454 is fixed.
+        if(false) {
             suite.addTestSuite(MessageHeaderTest.class);
-            // Fails due to inconsistent Message mapping in the JMS client.
-            suite.addTestSuite(MessageTypeTest.class);
-            suite.addTestSuite(QueueBrowserTest.class);
-
         }
 
-        // TODO: enable once QPID 0.19 is released
-        if(false) {
+        // TODO: enable once QPID 0.20 is released
+        if(true) {
+            suite.addTestSuite(TopicSessionTest.class);
+            suite.addTestSuite(MessageTypeTest.class);
+            suite.addTestSuite(QueueBrowserTest.class);
             suite.addTestSuite(UnifiedSessionTest.class);
             suite.addTestSuite(TemporaryTopicTest.class);
             suite.addTestSuite(TopicConnectionTest.class);
         }
 
-        if( false ) {
+        // Passing tests against 0.18
+        if( true ) {
             suite.addTestSuite(SelectorSyntaxTest.class);
             suite.addTestSuite(QueueSessionTest.class);
             suite.addTestSuite(SelectorTest.class);
             suite.addTestSuite(TemporaryQueueTest.class);
             suite.addTestSuite(SessionTest.class);
-        }
-
-        // Passing tests
-        if( false ) {
             suite.addTestSuite(ConnectionTest.class);
             suite.addTestSuite(JMSXPropertyTest.class);
             suite.addTestSuite(MessageBodyTest.class);

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala?rev=1416616&r1=1416615&r2=1416616&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala
Mon Dec  3 18:19:11 2012
@@ -26,7 +26,7 @@ import org.apache.qpid.proton.hawtdispat
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 
-class AmqpConnectionTest extends AmqpTestSupport {
+  class AmqpConnectionTest extends AmqpTestSupport {
 
   def print_result[T](action: String)(then: => Unit): Callback[T] = new Callback[T] {
     def onSuccess(value: T) {
@@ -56,7 +56,6 @@ class AmqpConnectionTest extends AmqpTes
     amqp.setUser("admin");
     amqp.setPassword("password");
 
-    val done = new CountDownLatch(1)
     val connection = AmqpConnection.connect(amqp)
     connection.queue() {
       var session = connection.createSession()
@@ -75,13 +74,11 @@ class AmqpConnectionTest extends AmqpTes
           def onMessageDelivery(delivery: MessageDelivery) = {
             println("Received: " + delivery.getMessage().getBody().asInstanceOf[AmqpValue].getValue);
             delivery.settle()
-            done.countDown()
+            connection.close()
           }
         })
       })
     }
-
-    done.await
     connection.waitForDisconnected()
   }
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1416616&r1=1416615&r2=1416616&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Mon Dec  3 18:19:11 2012
@@ -1047,7 +1047,6 @@ class LocalRouter(val virtual_host:Virtu
 
   def unbind(addresses: Array[_ <: BindAddress], consumer: DeliveryConsumer, persistent:Boolean,
security: SecurityContext) = {
     dispatch_queue.assertExecuting()
-    consumer.retain
     addresses.foreach { address=>
       address.domain match {
         case "topic" =>
@@ -1059,7 +1058,6 @@ class LocalRouter(val virtual_host:Virtu
         case _ => sys.error("Unknown domain: "+address.domain)
       }
     }
-    consumer.release
   }
 
   def connect(addresses: Array[_ <: ConnectAddress], producer: BindableDeliveryProducer,
security: SecurityContext):Option[String] = {

Modified: activemq/activemq-apollo/trunk/apollo-scala/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-scala/pom.xml?rev=1416616&r1=1416615&r2=1416616&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-scala/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-scala/pom.xml Mon Dec  3 18:19:11 2012
@@ -142,12 +142,8 @@
         <version>${maven-surefire-plugin-version}</version>
         
         <configuration>
-          <!-- we must turn off the use of system class loader so our tests can find stuff
- otherwise ScalaSupport compiler can't find stuff -->
-          <useSystemClassLoader>false</useSystemClassLoader>
-          <!--forkMode>pertest</forkMode-->
           <childDelegation>false</childDelegation>
           <useFile>true</useFile>
-          <failIfNoTests>false</failIfNoTests>
         </configuration>
       </plugin>
 

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1416616&r1=1416615&r2=1416616&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Mon Dec  3 18:19:11 2012
@@ -140,7 +140,7 @@
     <osgi.fragment.host>${project.groupId}.apollo-broker</osgi.fragment.host>
     <mvnplugins-version>1.15</mvnplugins-version>
     <qpid-proton-version>1.0-SNAPSHOT</qpid-proton-version>
-    <qpid-jms-version>0.18</qpid-jms-version>
+    <qpid-jms-version>0.22-SNAPSHOT</qpid-jms-version>
   </properties>
 
   <prerequisites>



Mime
View raw message