activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1421442 - in /activemq/activemq-apollo/trunk/apollo-amqp/src: main/scala/org/apache/activemq/apollo/amqp/ test/scala/org/apache/activemq/apollo/amqp/test/
Date Thu, 13 Dec 2012 19:36:58 GMT
Author: chirino
Date: Thu Dec 13 19:36:57 2012
New Revision: 1421442

URL: http://svn.apache.org/viewvc?rev=1421442&view=rev
Log:
Pick up proton API updates.

Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala?rev=1421442&r1=1421441&r2=1421442&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
Thu Dec 13 19:36:57 2012
@@ -20,17 +20,17 @@ package org.apache.activemq.apollo.amqp
 import org.apache.activemq.apollo.broker.protocol
 import protocol.{MessageCodecFactory, MessageCodec}
 import java.nio.ByteBuffer
-import org.apache.qpid.proton.codec.{WritableBuffer, CompositeWritableBuffer}
+import org.apache.qpid.proton.codec.{DroppingWritableBuffer, WritableBuffer, CompositeWritableBuffer}
 import org.fusesource.hawtbuf.Buffer._
 import org.apache.activemq.apollo.broker.Message
 import org.apache.activemq.apollo.broker.store.MessageRecord
 import org.fusesource.hawtbuf.Buffer
 import org.fusesource.hawtbuf.AsciiBuffer
 import org.fusesource.hawtbuf.UTF8Buffer
-import org.apache.qpid.proton.hawtdispatch.impl.DroppingWritableBuffer
-import org.apache.qpid.proton.`type`.{UnsignedLong, UnsignedInteger}
-import org.apache.qpid.proton.`type`.messaging.{Properties, Header}
+import org.apache.qpid.proton.amqp.{UnsignedLong, UnsignedInteger}
+import org.apache.qpid.proton.amqp.messaging.{Properties, Header}
 import org.apache.activemq.apollo.filter.Filterable
+import org.apache.qpid.proton.message.impl.MessageImpl
 
 object AmqpMessageCodecFactory extends MessageCodecFactory.Provider {
   def create = Array[MessageCodec](AmqpMessageCodec)
@@ -82,7 +82,7 @@ class AmqpMessage(private var encoded_bu
 
   def decoded = {
     if( decoded_message==null ) {
-      val amqp = new org.apache.qpid.proton.message.Message();
+      val amqp = new MessageImpl();
       var offset = encoded_buffer.offset
       var len = encoded_buffer.length
       while( len > 0 ) {
@@ -99,14 +99,13 @@ class AmqpMessage(private var encoded_bu
 
   override def encoded = {
     if( encoded_buffer == null ) {
-      var buffer = ByteBuffer.wrap(new Array[Byte](1024*4));
-      val overflow = new DroppingWritableBuffer();
-      var c = decoded_message.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer),
overflow));
-      if( overflow.position() > 0 ) {
-          buffer = ByteBuffer.wrap(new Array[Byte](1024*4+overflow.position()));
-          c = decoded_message.encode(new WritableBuffer.ByteBufferWrapper(buffer));
+      var buffer = new Array[Byte](1024);
+      var c = decoded_message.asInstanceOf[MessageImpl].encode2(buffer, 0, buffer.length);
+      if( c >  buffer.length) {
+        buffer = new Array[Byte](c);
+        decoded_message.encode(buffer, 0, c);
       }
-      encoded_buffer = new Buffer(buffer.array(), 0, c)
+      encoded_buffer = new Buffer(buffer, 0, c)
     }
     encoded_buffer
   }
@@ -149,7 +148,7 @@ class AmqpMessage(private var encoded_bu
       var ma = decoded.getMessageAnnotations
       var rc = ma.getValue.get(name)
       if( rc == null ) {
-        rc = ma.getValue.get(org.apache.qpid.proton.`type`.Symbol.valueOf(name))
+        rc = ma.getValue.get(org.apache.qpid.proton.amqp.Symbol.valueOf(name))
       }
       rc.asInstanceOf[AnyRef]
     } else {

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1421442&r1=1421441&r2=1421442&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
Thu Dec 13 19:36:57 2012
@@ -46,11 +46,12 @@ import org.apache.qpid.proton.framing.Tr
 import org.apache.qpid.proton.hawtdispatch.impl.{AmqpListener, AmqpTransport, AmqpProtocolCodec}
 import org.apache.qpid.proton.engine._
 import org.apache.qpid.proton.engine.impl.{ProtocolTracer, DeliveryImpl, LinkImpl, TransportImpl}
-import org.apache.qpid.proton.{`type` => proton_type}
-import proton_type.{Symbol => AmqpSymbol, UnsignedInteger, Binary, DescribedType}
-import proton_type.transport.SenderSettleMode
-import proton_type.messaging._
-import proton_type.transaction._
+import org.apache.qpid.proton.amqp
+import amqp.{Symbol => AmqpSymbol, UnsignedInteger, Binary, DescribedType}
+import amqp.transport.SenderSettleMode
+import amqp.messaging._
+import amqp.transaction._
+import org.apache.qpid.proton.message.impl.MessageImpl
 
 object AmqpProtocolHandler extends Log {
 
@@ -70,14 +71,12 @@ object AmqpProtocolHandler extends Log {
   DEFAULT_DESTINATION_PARSER.any_child_wildcard = "*"
   DEFAULT_DESTINATION_PARSER.any_descendant_wildcard = "**"
 
-  val COPY = org.apache.qpid.proton.`type`.Symbol.getSymbol("copy");
+  val COPY = org.apache.qpid.proton.amqp.Symbol.getSymbol("copy");
 
   val JMS_SELECTOR = AmqpSymbol.valueOf("jms-selector")
   val NO_LOCAL = AmqpSymbol.valueOf("no-local");
   val ORIGIN = AmqpSymbol.valueOf("origin");
 
-  val DURABLE = new UnsignedInteger(2);
-
   val EMPTY_BYTE_ARRAY = Array[Byte]()
 
   def toBytes(value: Long): Array[Byte] = {
@@ -191,7 +190,7 @@ class AmqpProtocolHandler extends Protoc
   }
 
   var amqp_connection:AmqpTransport = _
-  var amqp_trace = false
+  var amqp_trace = true
 
   def codec = connection.transport.getProtocolCodec.asInstanceOf[AmqpProtocolCodec]
 
@@ -491,9 +490,9 @@ class AmqpProtocolHandler extends Protoc
       var source = sender.getRemoteSource().asInstanceOf[Source]
       if( source == null ) {
         // Source get set to null when a durable sub is being ended.
-        source = new org.apache.qpid.proton.`type`.messaging.Source();
+        source = new amqp.messaging.Source();
         source.setAddress("dsub://"+sender.getName);
-        source.setDurable(DURABLE)
+        source.setDurable(TerminusDurability.UNSETTLED_STATE)
         source.setExpiryPolicy(TerminusExpiryPolicy.NEVER)
         sender.setSource(source);
       }
@@ -542,7 +541,7 @@ class AmqpProtocolHandler extends Protoc
         PathParser.containsWildCards(requested_addresses(0).path)
       }
 
-      val persistent = DURABLE == source.getDurable() && source.getExpiryPolicy ==
TerminusExpiryPolicy.NEVER
+      val persistent = TerminusDurability.UNSETTLED_STATE == source.getDurable() &&
source.getExpiryPolicy == TerminusExpiryPolicy.NEVER
       val addresses: Array[_ <: BindAddress] = if (persistent) {
         val dsubs = ListBuffer[BindAddress]()
         val topics = ListBuffer[BindAddress]()
@@ -1047,7 +1046,7 @@ class AmqpProtocolHandler extends Protoc
 
               message_id_counter += 1
 
-              val message = new org.apache.qpid.proton.message.Message
+              val message = new MessageImpl
               message.setMessageId(session_id.get + message_id_counter)
               message.setBody(new Data(new Binary(body.data, body.offset, body.length)))
               message.setContentType(content_type)
@@ -1104,7 +1103,7 @@ class AmqpProtocolHandler extends Protoc
     def process(proton_delivery:DeliveryImpl):Unit = {
       val state = proton_delivery.getRemoteState();
       state match {
-        case outcome:proton_type.messaging.Outcome =>
+        case outcome:amqp.messaging.Outcome =>
           process(proton_delivery, outcome, null)
         case state:TransactionalState =>
           transactions.get(toLong(state.getTxnId())) match {
@@ -1120,7 +1119,7 @@ class AmqpProtocolHandler extends Protoc
       }
     }
 
-    def process(proton_delivery:DeliveryImpl, outcome:proton_type.messaging.Outcome, uow:StoreUOW):Unit
= {
+    def process(proton_delivery:DeliveryImpl, outcome:amqp.messaging.Outcome, uow:StoreUOW):Unit
= {
       outcome match {
         case null =>
           if( !proton_delivery.remotelySettled() ) {
@@ -1132,13 +1131,13 @@ class AmqpProtocolHandler extends Protoc
               proton_delivery.disposition(new Accepted());
           }
           settle(proton_delivery, Consumed, false, uow);
-        case rejected:proton_type.messaging.Rejected =>
+        case rejected:amqp.messaging.Rejected =>
           // re-deliver /w incremented delivery counter.
           settle(proton_delivery, null, true, uow);
-        case release:proton_type.messaging.Released =>
+        case release:amqp.messaging.Released =>
           // re-deliver && don't increment the counter.
           settle(proton_delivery, null, false, uow);
-        case modified:proton_type.messaging.Modified =>
+        case modified:amqp.messaging.Modified =>
           def b(v:java.lang.Boolean) = v!=null && v.booleanValue()
           var ackType = if(b(modified.getUndeliverableHere())) {
               // receiver does not want the message..

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala?rev=1421442&r1=1421441&r2=1421442&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala
Thu Dec 13 19:36:57 2012
@@ -17,8 +17,7 @@
 
 package org.apache.activemq.apollo.amqp.test
 
-import org.apache.qpid.proton.`type`.messaging.{AmqpValue, Source, Target}
-import java.util.concurrent.CountDownLatch
+import org.apache.qpid.proton.amqp.messaging.{AmqpValue, Source, Target}
 import org.fusesource.hawtdispatch._
 import org.apache.qpid.proton.hawtdispatch.api._
 
@@ -60,14 +59,14 @@ import org.apache.qpid.proton.hawtdispat
     connection.queue() {
       var session = connection.createSession()
       val target = new Target
-      target.setAddress("/queue/FOO")
+      target.setAddress("queue://FOO")
       val sender = session.createSender(target);
       val md = sender.send(session.createTextMessage("Hello World"))
       md.onSettle(print_result("message sent") {
         println("========================================================")
         println("========================================================")
         val source = new Source
-        source.setAddress("/queue/FOO")
+        source.setAddress("queue://FOO")
         val receiver = session.createReceiver(source);
         receiver.resume()
         receiver.setDeliveryListener(new AmqpDeliveryListener {



Mime
View raw message