activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1355565 - in /activemq/activemq-apollo/trunk/apollo-amqp/src: main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala test/scala/org/apache/activemq/apollo/amqp/FuseSourceClientTest.scala
Date Fri, 29 Jun 2012 21:28:47 GMT
Author: chirino
Date: Fri Jun 29 21:28:46 2012
New Revision: 1355565

URL: http://svn.apache.org/viewvc?rev=1355565&view=rev
Log:
Pickup fusesource api updates and add use it for a client test.

Added:
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/FuseSourceClientTest.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1355565&r1=1355564&r2=1355565&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
Fri Jun 29 21:28:46 2012
@@ -35,17 +35,16 @@ import org.apache.activemq.apollo.broker
 import org.apache.activemq.apollo.amqp.dto._
 
 import org.fusesource.amqp._
-import org.fusesource.amqp.Callback
-import org.fusesource.amqp.codec.api.AnnotatedMessage
-import org.fusesource.amqp.codec.marshaller.MessageSupport
-import org.fusesource.amqp.codec.types._
+import org.fusesource.amqp.callback._
+import org.fusesource.amqp.callback.Callback
+import org.fusesource.amqp.types._
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 object AMQPMessage {
 
-  def apply(annotated:AnnotatedMessage):AMQPMessage = {
+  def apply(annotated:Envelope):AMQPMessage = {
     val payload = MessageSupport.toBuffer(annotated)
     val rc = AMQPMessage(payload)
     rc._annotated = annotated
@@ -53,14 +52,14 @@ object AMQPMessage {
   }
 }
 
-case class AMQPMessage(payload:Buffer) extends Message {
+case class AMQPMessage(payload:Buffer) extends org.apache.activemq.apollo.broker.Message
{
   import AmqpProtocolHandler._
   def protocol = AmqpProtocol
 
-  var _annotated:AnnotatedMessage = _
+  var _annotated:Envelope = _
   def annotated = {
     if ( _annotated ==null ) {
-      _annotated = MessageSupport.decodeAnnotatedMessage(payload)
+      _annotated = MessageSupport.decodeEnvelope(payload)
     }
     _annotated
   }
@@ -196,8 +195,7 @@ class AmqpProtocolHandler extends Protoc
     val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
     config = connector_config.protocols.find( _.isInstanceOf[AmqpDTO]).map(_.asInstanceOf[AmqpDTO]).getOrElse(new
AmqpDTO)
 
-    val options = new AMQPConnectionOptions
-    options.setServer(true);
+    val options = new AMQPServerConnectionOptions
     options.setTransport(connection.transport);
     options.setMaxFrameSize(1024*4)
     options.setIdleTimeout(-1);
@@ -213,7 +211,13 @@ class AmqpProtocolHandler extends Protoc
     })
     options.setListener(new AMQPConnection.Listener(){
 
-      override def onBegin(begin: Begin) = new AMQPSessionOptions(100, 100, session_listener)
+      override def onBegin(begin: Begin) = {
+        val rc = new AMQPServerSessionOptions
+        rc.setIncomingWindow(100)
+        rc.setOutgoingWindow(100)
+        rc.setListener(session_listener)
+        rc
+      }
 
       override def onAccepted(session: AMQPSession) {
         connection_log.info("accepted: "+session)
@@ -374,13 +378,12 @@ class AmqpProtocolHandler extends Protoc
     var receiver: AMQPReceiver = null
     // create the producer route...
     val options = new AMQPReceiverOptions();
-    options.source = attach.getSource.asInstanceOf[Source]
-    options.target = attach.getTarget.asInstanceOf[Target]
-    options.name = attach.getName
-    options.senderSettleMode = SenderSettleMode.valueOf(attach.getSndSettleMode)
-    options.receiverSettleMode = ReceiverSettleMode.valueOf(attach.getRcvSettleMode)
-
-    options.maxMessageSize = 10 * 1024 * 1024;
+    options.setSource(attach.getSource.asInstanceOf[Source])
+    options.setTarget(attach.getTarget.asInstanceOf[Target])
+    options.setName(attach.getName)
+    options.setSenderSettleMode(SenderSettleMode.valueOf(attach.getSndSettleMode))
+    options.setReceiverSettleMode(ReceiverSettleMode.valueOf(attach.getRcvSettleMode))
+    options.setMaxMessageSize(10 * 1024 * 1024);
 
     def pump = {
       while (target.is_connected && !target.full && receiver.peek() != null)
{
@@ -424,7 +427,7 @@ class AmqpProtocolHandler extends Protoc
     target.refiller = ^ {
       pump
     }
-    options.listener = new AMQPEndpoint.Listener {
+    options.setListener(new AMQPEndpoint.Listener {
       override def onTransfer() = pump
 
       override def onClosed(senderClosed: Boolean, error: Error) {
@@ -435,7 +438,7 @@ class AmqpProtocolHandler extends Protoc
           host.router.disconnect(target.addresses, target)
         }
       }
-    }
+    })
 
     // start with 0 credit window so that we don't receive any messages
     // until we have verified if that we can connect to the destination..
@@ -517,7 +520,7 @@ class AmqpProtocolHandler extends Protoc
 
         var annotated = if( message.protocol eq AmqpProtocol ) {
           val original = message.asInstanceOf[AMQPMessage].annotated
-          var annotated = new AnnotatedMessageImpl
+          var annotated = new Envelope
           annotated.setHeader(header)
           annotated.setDeliveryAnnotations(original.getDeliveryAnnotations)
           annotated.setMessageAnnotations(original.getMessageAnnotations)
@@ -531,14 +534,15 @@ class AmqpProtocolHandler extends Protoc
             case _ => (message.encoded, "protocol/"+message.protocol.id())
           }
           
-          val bare = new ValueMessageImpl(new AMQPBinary(body))
+          val bare = new types.Message
+          bare.setData(new Data(body))
           var properties = new Properties()
           properties.setContentType(ascii(content_type))
           if( delivery.expiration!= 0 ) {
             properties.setAbsoluteExpiryTime(new Date(delivery.expiration))
           }
           bare.setProperties(properties)
-          var annotated = new AnnotatedMessageImpl
+          var annotated = new Envelope
           annotated.setHeader(header)
           annotated.setMessage(bare)
           annotated
@@ -555,7 +559,7 @@ class AmqpProtocolHandler extends Protoc
     // AMQPEndpoint.Listener interface..
     ///////////////////////////////////////////////////////////////////
     object endpoint_listener extends AMQPEndpoint.Listener {
-      override def onTransfer = {
+      override def onTransfer = queue {
         sink.refiller.run()
       }
       override def onClosed(senderClosed: Boolean, error: Error) {
@@ -601,25 +605,28 @@ class AmqpProtocolHandler extends Protoc
 
   def attach_receiver(attach: Attach, address: String, requested_addresses:Array[SimpleAddress],
callback: Callback[AMQPEndpoint]) = try {
     val options = new AMQPSenderOptions();
-    options.source = attach.getSource.asInstanceOf[Source]
-    options.source.setDefaultOutcome(new Released())
+
+    val src = attach.getSource.asInstanceOf[Source]
+    src.setDefaultOutcome(new Released())
 
     if (attach.getSndSettleMode == SenderSettleMode.SETTLED.getValue) {
       // if we are settling... then no other outcomes are possible..
-      options.source.setOutcomes(Array())
+      src.setOutcomes(Array())
     } else {
-      options.source.setOutcomes(Array(
+      src.setOutcomes(Array(
         new AMQPSymbol(Accepted.SYMBOLIC_ID),
         new AMQPSymbol(Rejected.SYMBOLIC_ID),
         new AMQPSymbol(Released.SYMBOLIC_ID),
         new AMQPSymbol(Modified.SYMBOLIC_ID)
       ))
     }
-    options.target = attach.getTarget.asInstanceOf[Target]
-    options.name = attach.getName
-    options.senderSettleMode = SenderSettleMode.valueOf(attach.getSndSettleMode)
-    options.receiverSettleMode = ReceiverSettleMode.valueOf(attach.getRcvSettleMode)
-    options.maxMessageSize = 10 * 1024 * 1024;
+    options.setSource(src)
+
+    options.setTarget(attach.getTarget.asInstanceOf[Target])
+    options.setName(attach.getName)
+    options.setSenderSettleMode(SenderSettleMode.valueOf(attach.getSndSettleMode))
+    options.setReceiverSettleMode(ReceiverSettleMode.valueOf(attach.getRcvSettleMode))
+    options.setMaxMessageSize(10 * 1024 * 1024);
 
 
     val subscription_id = attach.getName
@@ -676,7 +683,7 @@ class AmqpProtocolHandler extends Protoc
     val from_seq = from_seq_opt.getOrElse(0L)
 
     val source = new AMQPConsumer(subscription_id, addresses, selector, browser, exclusive,
include_seq, from_seq, browser_end);
-    options.listener = source.endpoint_listener
+    options.setListener(source.endpoint_listener)
     source.sender = AMQP.createSender(options)
 
     host.dispatch_queue {

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/FuseSourceClientTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/FuseSourceClientTest.scala?rev=1355565&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/FuseSourceClientTest.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/FuseSourceClientTest.scala
Fri Jun 29 21:28:46 2012
@@ -0,0 +1,96 @@
+package org.apache.activemq.apollo.amqp
+
+import org.fusesource.amqp.blocking.AMQP
+import org.fusesource.amqp.types._
+import org.fusesource.amqp._
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+class FuseSourceClientTest extends AmqpTestSupport {
+
+  test("broker") {
+
+//    val port = 5672
+//    val queue = "testqueue"
+
+    val queue = "/queue/fstestqueue"
+
+    val nMsgs = 1
+//    val qos = QoS.AT_MOST_ONCE
+    
+    try {
+      val connect_options = new AMQPClientOptions
+      connect_options.setHost("127.0.0.1", port)
+      connect_options.setContainerId("client")
+      connect_options.setIdleTimeout(-1)
+      connect_options.setMaxFrameSize(1024*4)
+//      connect_options.setListener(new AMQPConnection.Listener())
+      val connection = AMQP.open(connect_options);
+      {
+        var data = "x" * 10 // 1024*20
+
+        var session = connection.createSession(10, 10)
+        val sender_options = new AMQPSenderOptions
+        sender_options.setQoS(AMQPQoS.AT_MOST_ONCE)
+        sender_options.setTarget(queue);
+        var p = AMQP.createSender(sender_options)
+        p.attach(session);
+
+        for (i <- 0 until nMsgs) {
+          var s = "Message #" + (i + 1)
+          println("Sending " + s)
+          p.send(MessageSupport.message(s+", data: "+data))
+        }
+
+        p.close()
+        session.close()
+      }
+      {
+        var session = connection.createSession(10, 10)
+
+        val receiver_options = new AMQPReceiverOptions
+        receiver_options.setQoS(AMQPQoS.AT_MOST_ONCE)
+        receiver_options.setSource(queue);
+        var c = AMQP.createReceiver(receiver_options)
+        c.attach(session);
+
+        // Receive messages non-transacted
+        for (i <- 0 until nMsgs) {
+          val msg = c.receive();
+          if (msg == null)
+
+          msg.getMessage().getData match {
+            case value:AMQPString =>
+              println("Received: " + value.getValue());
+          }
+//          if (!msg.isSettled())
+//            msg.accept();
+        }
+        c.close()
+        session.close()
+      }
+      connection.close()
+    } catch {
+      case e: Exception => {
+        e.printStackTrace
+      }
+    }
+
+  }
+
+}
\ No newline at end of file



Mime
View raw message