activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1373021 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ apollo-stomp/src/test/scala/org/apache/ac...
Date Tue, 14 Aug 2012 18:46:54 GMT
Author: chirino
Date: Tue Aug 14 18:46:54 2012
New Revision: 1373021

URL: http://svn.apache.org/viewvc?rev=1373021&view=rev
Log:
Simplify test cases a bit.

Added:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawMessageCodec.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala

Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index?rev=1373021&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index
(added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index
Tue Aug 14 18:46:54 2012
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.apollo.broker.protocol.RawMessageCodec
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala?rev=1373021&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
Tue Aug 14 18:46:54 2012
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.protocol
+
+import java.io.IOException
+import org.apache.activemq.apollo.broker.store.MessageRecord
+import org.fusesource.hawtdispatch._
+import org.apache.activemq.apollo.util.{Log, ClassFinder}
+import org.apache.activemq.apollo.broker.{Broker, Message, BrokerConnection}
+import org.apache.activemq.apollo.dto.{SimpleProtocolFilterDTO, ProtocolFilterDTO, ConnectionStatusDTO}
+
+trait Protocol {
+  def id:String
+  def createProtocolHandler:ProtocolHandler
+  def encode(message:Message):MessageRecord
+  def decode(message:MessageRecord):Message
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object Protocol {
+
+  val finder = new ClassFinder[Protocol]("META-INF/services/org.apache.activemq.apollo/protocol-factory.index",classOf[Protocol])
+
+  val protocols = Map((for( provider <- finder.singletons ) {
+    yield (provider.id, provider)
+  }): _*)
+
+  def get(name:String):Option[Protocol] = protocols.get(name)
+}
+
+object ProtocolHandler extends Log
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait ProtocolHandler {
+  import ProtocolHandler._
+
+  def protocol:String
+
+  def session_id:Option[String]
+
+  var connection:BrokerConnection = null;
+
+  def set_connection(brokerConnection:BrokerConnection) = {
+    this.connection = brokerConnection
+  }
+
+  def create_connection_status = new ConnectionStatusDTO
+
+  def on_transport_failure(error:IOException) = {
+    trace(error)
+    connection.stop(NOOP)
+  }
+
+  def on_transport_disconnected = {}
+
+  def on_transport_connected = {}
+
+  def on_transport_command(command: AnyRef) = {}
+
+}
+
+@deprecated(message="Please use the ProtocolFilter2 interface instead", since="1.3")
+trait ProtocolFilter {
+  def filter[T](command: T):T
+}
+
+object ProtocolFilter2Factory {
+
+  val providers = new ClassFinder[Provider]("META-INF/services/org.apache.activemq.apollo/protocol-filter-factory.index",classOf[Provider])
+
+  trait Provider {
+    def create( dto:ProtocolFilterDTO, handler:ProtocolHandler ):ProtocolFilter2
+  }
+
+  def create( dto:ProtocolFilterDTO, handler:ProtocolHandler ):ProtocolFilter2 = {
+    for( p <- providers.singletons ) {
+      val rc = p.create(dto, handler)
+      if( rc!=null ) {
+        return rc;
+      }  
+    }
+    throw new IllegalArgumentException("Cannot create a protocol filter for DTO: "+dto)
+  }
+}
+
+object SimpleProtocolFilter2Factory extends ProtocolFilter2Factory.Provider {
+  def create( dto:ProtocolFilterDTO, handler:ProtocolHandler ):ProtocolFilter2 = dto match
{
+    case dto:SimpleProtocolFilterDTO =>
+      val instance = Broker.class_loader.loadClass(dto.kind).newInstance().asInstanceOf[AnyRef]
+      val filter = instance match {
+        case self:ProtocolFilter2 => self
+        case self:ProtocolFilter => new ProtocolFilter2() {
+          override def filter_inbound[T](command: T): Option[T] = Some(self.filter(command))
+          override def filter_outbound[T](command: T): Option[T] = Some(command)
+        }
+        case null => null
+        case _ => throw new IllegalArgumentException("Invalid protocol filter type: "+instance.getClass)
+      }
+      type FilterDuckType = {
+        var protocol_handler:ProtocolHandler
+        var dto:SimpleProtocolFilterDTO
+      }
+      try {
+        filter.asInstanceOf[FilterDuckType].protocol_handler = handler
+      } catch { case _ => }
+      try {
+        filter.asInstanceOf[FilterDuckType].dto = dto
+      } catch { case _ => }
+      filter
+    case _ => null
+  }
+}
+
+object ProtocolFilter2 {
+  def create_filters(dtos:List[ProtocolFilterDTO], handler:ProtocolHandler) = {
+    dtos.map(ProtocolFilter2Factory.create(_, handler))
+  }
+}
+
+/**
+ * A Protocol filter can filter frames being sent/received to and from a client.  It can
modify
+ * the frame or even drop it.
+ */
+abstract class ProtocolFilter2 {
+
+  /**
+   * Filters a command frame received from a client.
+   * returns None if the filter wants to drop the frame.
+   */
+  def filter_inbound[T](frame: T):Option[T]
+
+  /**
+   * Filters a command frame being sent client.
+   * returns None if the filter wants to drop the frame.
+   */
+  def filter_outbound[T](frame: T):Option[T]
+}
+

Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawMessageCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawMessageCodec.scala?rev=1373021&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawMessageCodec.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawMessageCodec.scala
Tue Aug 14 18:46:54 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.protocol
+
+import java.nio.ByteBuffer
+import org.apache.activemq.apollo.util._
+import org.apache.activemq.apollo.broker._
+import java.lang.{Class, String}
+import org.apache.activemq.apollo.broker.store.MessageRecord
+import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
+import org.fusesource.hawtdispatch.transport.ProtocolCodec
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object RawMessageCodec extends MessageCodec {
+  
+  val PROTOCOL_ID = new AsciiBuffer(id)
+  def id = "raw"
+
+  override def encode(message: Message):MessageRecord = {
+    message match {
+      case message:RawMessage =>
+        val rc = new MessageRecord
+        rc.codec = PROTOCOL_ID
+        rc.buffer = message.payload
+        rc
+      case _ => throw new RuntimeException("Invalid message type");
+    }
+  }
+
+  override def decode(message: MessageRecord) = {
+    assert( message.codec == PROTOCOL_ID )
+    RawMessage(message.buffer)
+  }
+
+  def createProtocolHandler = throw new UnsupportedOperationException
+  def createProtocolCodec = throw new UnsupportedOperationException
+}
+
+case class RawMessage(payload:Buffer) extends Message {
+
+  def getBodyAs[T](toType : Class[T]) = {
+    if( toType.isAssignableFrom(classOf[Buffer]) ) {
+      toType.cast(payload)
+    } else if( toType == classOf[Array[Byte]] ) {
+      toType.cast(payload.toByteArray)
+    } else if( toType == classOf[ByteBuffer] ) {
+      toType.cast(payload.toByteBuffer)
+    } else {
+      null.asInstanceOf[T]
+    }
+  }
+
+  def getLocalConnectionId = null
+  def getProperty(name: String) = null
+  def expiration = 0L
+  def persistent = false
+  def priority = 0
+  def codec = RawMessageCodec
+  def release() {}
+  def retain() {}
+  def retained() = 0
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala?rev=1373021&r1=1373020&r2=1373021&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
Tue Aug 14 18:46:54 2012
@@ -28,55 +28,13 @@ import java.util.Map.Entry
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.broker._
 import org.apache.activemq.apollo.broker.security.SecurityContext
-import java.lang.String
 
 
-/**
- * <p>
- *   A protocol factory for the UDP protocol.
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class UdpProtocolFactory extends ProtocolFactory {
-
-  def create() = UdpProtocol
-
-  def create(config: String): Protocol = {
-    if (config == "udp") {
-      return UdpProtocol
-    }
-    return null
-  }
-
-}
-
-/**
- * <p>
- *   The UDP protocol made for handling the UDP transport.
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object UdpProtocol extends Protocol {
-
-  def id = "udp"
-
-  def createProtocolCodec = new UdpProtocolCodec()
-  def createProtocolHandler = new UdpProtocolHandler
-  def encode(message: Message) = throw new UnsupportedOperationException
-  def decode(message: MessageRecord) = throw new UnsupportedOperationException
-  def isIdentifiable = false
-  def maxIdentificaionLength = throw new UnsupportedOperationException()
-  def matchesIdentification(buffer: Buffer) = throw new UnsupportedOperationException()
-
-}
-
 case class UdpMessage(from:SocketAddress, buffer:ByteBuffer)
 
 class UdpProtocolCodec extends ProtocolCodec {
 
-  def protocol = UdpProtocol.id
+  def protocol = "udp"
 
   var channel: DatagramChannel = null
   def setReadableByteChannel(channel: ReadableByteChannel) = {
@@ -119,45 +77,21 @@ class UdpProtocolCodec extends ProtocolC
 
 }
 
-trait UdpDecoder {
-  def init(handler:UdpProtocolHandler)
-  def address(message:UdpMessage):AsciiBuffer
-  def decode_addresses(value:AsciiBuffer):Array[SimpleAddress]
-  def decode_delivery(message:UdpMessage):Delivery
-}
-
-class DefaultUdpDecoder extends UdpDecoder {
+object UdpProtocolHandler extends Log
 
-  var topic_address:AsciiBuffer = _
-  var topic_address_decoded:Array[SimpleAddress] = _
+class UdpMessage {
 
-  def init(handler:UdpProtocolHandler) = {
-    val topic_name = Option(handler.config.topic).getOrElse("udp")
-    topic_address_decoded = LocalRouter.destination_parser.decode_multi_destination(topic_name,
(name)=> LocalRouter.destination_parser.decode_single_destination("topic:"+name, null))
-    topic_address = new AsciiBuffer(LocalRouter.destination_parser.encode_destination(topic_address_decoded))
-  }
-
-  def address(message: UdpMessage) = topic_address
-  def decode_addresses(value: AsciiBuffer) = topic_address_decoded
-  def decode_delivery(message: UdpMessage) = {
-    val delivery = new Delivery
-    delivery.size = message.buffer.remaining()
-    delivery.message = RawMessage(new Buffer(message.buffer))
-    delivery
-  }
 }
 
-object UdpProtocolHandler extends Log
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class UdpProtocolHandler extends ProtocolHandler {
   import UdpProtocolHandler._
 
-  def protocol = UdpProtocol.id
+  def protocol = "udp"
   def session_id = None
 
-  var decoder:UdpDecoder = _
   var buffer_size = 0
   var host:VirtualHost = _
   var connection_log:Log = _
@@ -179,15 +113,6 @@ class UdpProtocolHandler extends Protoco
       case _ => None
     }).getOrElse(new UdpDTO)
 
-    val decoder_name = Option(config.decoder).getOrElse(classOf[DefaultUdpDecoder].getName)
-    decoder = try {
-      this.getClass.getClassLoader.loadClass(decoder_name).newInstance().asInstanceOf[UdpDecoder]
-    } catch {
-      case x =>
-        warn(x)
-        connection.stop(NOOP)
-        new DefaultUdpDecoder
-    }
     buffer_size = MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
     decoder.init(this)
 
@@ -260,11 +185,46 @@ class UdpProtocolHandler extends Protoco
         inbound_queue.removeFirst
       }
       
-      val delivery = decoder.decode_delivery(frame)
+      val delivery = decode_delivery(frame)
       inbound_queue_size += delivery.size
       inbound_queue.offer(delivery)
     }
   }
-  
+
+
+  abstract def decode_delivery(message: UdpMessage):Delivery
 }
 
+/**
+ * <p>
+ *   The UDP protocol made for handling the UDP transport.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class UdpProtocol extends Protocol {
+
+  def id = "udp"
+  def createProtocolCodec:ProtocolCodec = new UdpProtocolCodec()
+  def createProtocolHandler:ProtocolHandler = new UdpProtocolHandler {
+
+    var topic_address:AsciiBuffer = _
+    var topic_address_decoded:Array[SimpleAddress] = _
+
+    def init(handler:UdpProtocolHandler) = {
+      val topic_name = Option(handler.config.topic).getOrElse("udp")
+      topic_address_decoded = LocalRouter.destination_parser.decode_multi_destination(topic_name,
(name)=> LocalRouter.destination_parser.decode_single_destination("topic:"+name, null))
+      topic_address = new AsciiBuffer(LocalRouter.destination_parser.encode_destination(topic_address_decoded))
+    }
+
+    def address(message: UdpMessage) = topic_address
+    def decode_addresses(value: AsciiBuffer) = topic_address_decoded
+    def decode_delivery(message: UdpMessage) = {
+      val delivery = new Delivery
+      delivery.size = message.buffer.remaining()
+      delivery.message = RawMessage(new Buffer(message.buffer))
+      delivery
+    }
+
+  }
+}

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1373021&r1=1373020&r2=1373021&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
Tue Aug 14 18:46:54 2012
@@ -487,44 +487,18 @@ class StompParallelTest extends StompTes
 
     var sub_id = 0;
     def test_selector(selector: String, headers: List[String], expected_matches: List[Int])
= {
-
-      client.write(
-        "SUBSCRIBE\n" +
-                "destination:/topic/selected\n" +
-                "selector:" + selector + "\n" +
-                "receipt:0\n" +
-                "id:" + sub_id + "\n" +
-                "\n")
-      wait_for_receipt("0")
-
+      subscribe(""+sub_id, "/topic/selected-"+sub_id, headers="selector:" + selector + "\n")
       var id = 1;
-
-      headers.foreach {
-        header =>
-          client.write(
-            "SEND\n" +
-                    "destination:/topic/selected\n" +
-                    header + "\n" +
-                    "\n" +
-                    "message:" + id + "\n")
-          id += 1;
+      for( header <- headers) {
+        async_send("/topic/selected-"+sub_id, "message:%d:%d\n".format(sub_id, id), header
+ "\n")
+        id += 1;
       }
-
-      expected_matches.foreach {
-        id =>
+      for( id <- expected_matches) {
           val frame = client.receive()
           frame should startWith("MESSAGE\n")
-          frame should endWith regex ("\n\nmessage:" + id + "\n")
+          frame should endWith("\n\nmessage:%d:%d\n".format(sub_id, id))
       }
-
-      client.write(
-        "UNSUBSCRIBE\n" +
-                "id:" + sub_id + "\n" +
-                "receipt:0\n" +
-                "\n")
-
-      wait_for_receipt("0")
-
+      unsubscribe(""+sub_id)
       sub_id += 1
     }
 
@@ -759,53 +733,23 @@ class StompParallelTest extends StompTes
 
   test("Topic /w Durable sub retains messages.") {
     connect("1.1")
-
-    def put(id: Int) = {
-      client.write(
-        "SEND\n" +
-                "destination:/topic/updates2\n" +
-                "\n" +
-                "message:" + id + "\n")
-    }
-
-    client.write(
-      "SUBSCRIBE\n" +
-              "destination:/topic/updates2\n" +
-              "id:my-sub-name\n" +
-              "persistent:true\n" +
-              "include-seq:seq\n" +
-              "receipt:0\n" +
-              "\n")
-    wait_for_receipt("0")
+    val dest = next_id("/topic/dsub_test_")
+    subscribe("my-sub-name", dest, persistent=true)
     client.close
 
     // Close him out.. since persistent:true then
     // the topic subscription will be persistent accross client
     // connections.
-
     connect("1.1")
-    put(1)
-    put(2)
-    put(3)
-
-    client.write(
-      "SUBSCRIBE\n" +
-              "destination:/topic/updates2\n" +
-              "id:my-sub-name\n" +
-              "persistent:true\n" +
-              "include-seq:seq\n" +
-              "\n")
-
-    def get(id: Int) = {
-      val frame = client.receive()
-      frame should startWith("MESSAGE\n")
-      frame should include("subscription:my-sub-name\n")
-      frame should endWith regex ("\n\nmessage:" + id + "\n")
-    }
-
-    get(1)
-    get(2)
-    get(3)
+    async_send(dest, 1)
+    async_send(dest, 2)
+    async_send(dest, 3)
+
+    subscribe("my-sub-name", dest, persistent=true)
+
+    assert_received(1, "my-sub-name")
+    assert_received(2, "my-sub-name")
+    assert_received(3, "my-sub-name")
   }
 
   test("Queue and a selector") {



Mime
View raw message