activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1295355 - in /activemq/activemq-apollo/trunk: ./ apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ apollo-broker/src/main/scala/org/apach...
Date Thu, 01 Mar 2012 00:24:42 GMT
Author: chirino
Date: Thu Mar  1 00:24:41 2012
New Revision: 1295355

URL: http://svn.apache.org/viewvc?rev=1295355&view=rev
Log:
Upgrade to next hawtdispatch version.. Add a UDP based transport and protocol.  We should
be abel to take syslog events to deliver them to a topic.

Added:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/UdpTransportFactory.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
    activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/Module.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/pom.xml

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index?rev=1295355&r1=1295354&r2=1295355&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
Thu Mar  1 00:24:41 2012
@@ -14,4 +14,6 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.broker.protocol.AnyProtocolFactory
\ No newline at end of file
+org.apache.activemq.apollo.broker.protocol.AnyProtocolFactory
+org.apache.activemq.apollo.broker.protocol.UdpProtocolFactory
+org.apache.activemq.apollo.broker.protocol.RawProtocolFactory
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index?rev=1295355&r1=1295354&r2=1295355&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index
Thu Mar  1 00:24:41 2012
@@ -18,3 +18,4 @@ org.apache.activemq.apollo.broker.transp
 org.apache.activemq.apollo.broker.transport.TcpTransportFactory
 org.apache.activemq.apollo.broker.transport.SslTransportFactory
 org.apache.activemq.apollo.broker.jetty.WebSocketTransportFactory
+org.apache.activemq.apollo.broker.transport.UdpTransportFactory

Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala?rev=1295355&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala
Thu Mar  1 00:24:41 2012
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.protocol
+
+import java.nio.ByteBuffer
+import org.apache.activemq.apollo.util._
+import org.apache.activemq.apollo.broker._
+import java.lang.{Class, String}
+import org.apache.activemq.apollo.broker.store.MessageRecord
+import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class RawProtocolFactory extends ProtocolFactory {
+  def create() = RawProtocol
+  def create(config: String): Protocol = {
+    config match {
+      case "raw" => RawProtocol
+      case _ => null
+    }
+  }
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object RawProtocol extends Protocol {
+  
+  val PROTOCOL_ID = new AsciiBuffer(id)
+  def id = "raw"
+
+  def encode(message: Message):MessageRecord = {
+    message match {
+      case message:RawMessage =>
+        val rc = new MessageRecord
+        rc.protocol = PROTOCOL_ID
+        rc.buffer = message.payload
+        rc
+      case _ => throw new RuntimeException("Invalid message type");
+    }
+  }
+
+  def decode(message: MessageRecord) = {
+    assert( message.protocol == PROTOCOL_ID )
+    RawMessage(message.buffer)
+  }
+
+  def createProtocolCodec = throw new UnsupportedOperationException()
+  def createProtocolHandler = throw new UnsupportedOperationException()
+  def isIdentifiable = false
+  def maxIdentificaionLength = throw new UnsupportedOperationException()
+  def matchesIdentification(buffer: Buffer) = throw new UnsupportedOperationException()
+}
+
+case class RawMessage(payload:Buffer) extends Message {
+
+  def getBodyAs[T](toType : Class[T]) = {
+    if( toType.isAssignableFrom(classOf[Buffer]) ) {
+      toType.cast(payload)
+    } else if( toType == classOf[Array[Byte]] ) {
+      toType.cast(payload.toByteArray)
+    } else if( toType == classOf[ByteBuffer] ) {
+      toType.cast(payload.toByteBuffer)
+    } else {
+      null
+    }
+  }
+
+  def getLocalConnectionId = null
+  def getProperty(name: String) = null
+  def expiration = 0L
+  def persistent = false
+  def priority = 0
+  def protocol = RawProtocol
+  def release() {}
+  def retain() {}
+  def retained() = 0
+}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala?rev=1295355&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
Thu Mar  1 00:24:41 2012
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.protocol
+
+import org.apache.activemq.apollo.broker.store.MessageRecord
+import org.fusesource.hawtdispatch.transport.ProtocolCodec
+import java.nio.ByteBuffer
+import org.fusesource.hawtdispatch._
+import java.nio.channels.{DatagramChannel, WritableByteChannel, ReadableByteChannel}
+import java.net.SocketAddress
+import org.apache.activemq.apollo.dto.{UdpDTO, AcceptingConnectorDTO}
+import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
+import java.util.Map.Entry
+import org.apache.activemq.apollo.util._
+import org.apache.activemq.apollo.broker._
+import org.apache.activemq.apollo.broker.security.SecurityContext
+import java.lang.String
+
+
+/**
+ * <p>
+ *   A protocol factory for the UDP protocol.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class UdpProtocolFactory extends ProtocolFactory {
+
+  def create() = UdpProtocol
+
+  def create(config: String): Protocol = {
+    if (config == "udp") {
+      return UdpProtocol
+    }
+    return null
+  }
+
+}
+
+/**
+ * <p>
+ *   The UDP protocol made for handling the UDP transport.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object UdpProtocol extends Protocol {
+
+  def id = "udp"
+
+  def createProtocolCodec = new UdpProtocolCodec()
+  def createProtocolHandler = new UdpProtocolHandler
+  def encode(message: Message) = throw new UnsupportedOperationException
+  def decode(message: MessageRecord) = throw new UnsupportedOperationException
+  def isIdentifiable = false
+  def maxIdentificaionLength = throw new UnsupportedOperationException()
+  def matchesIdentification(buffer: Buffer) = throw new UnsupportedOperationException()
+
+}
+
+case class UdpMessage(from:SocketAddress, buffer:ByteBuffer)
+
+class UdpProtocolCodec extends ProtocolCodec {
+
+  def protocol = UdpProtocol.id
+
+  var channel: DatagramChannel = null
+  def setReadableByteChannel(channel: ReadableByteChannel) = {
+    this.channel = channel.asInstanceOf[DatagramChannel]
+  }
+
+  var read_counter = 0L
+  var read_read_size = 0L
+
+  def read: AnyRef = {
+    if (channel == null) {
+      throw new IllegalStateException
+    }
+    val buffer = ByteBuffer.allocate(channel.socket().getReceiveBufferSize)
+    val from = channel.receive(buffer)
+    if( from == null ) {
+      null
+    } else {
+      buffer.flip()
+      read_read_size = buffer.remaining()
+      read_counter += read_read_size
+      UdpMessage(from, buffer)
+    }
+  }
+
+  def getLastReadSize = read_read_size
+  def getReadCounter = read_counter
+  def getReadBufferSize = channel.socket().getReceiveBufferSize
+
+  def unread(buffer: Array[Byte]) = throw new UnsupportedOperationException()
+
+  // This protocol only supports receiving..
+  def setWritableByteChannel(channel: WritableByteChannel) = {}
+  def write(value: AnyRef) = ProtocolCodec.BufferState.FULL
+  def full: Boolean = true
+  def flush = ProtocolCodec.BufferState.FULL
+  def getWriteCounter = 0L
+  def getLastWriteSize = 0
+  def getWriteBufferSize = 0
+
+}
+
+trait UdpDecoder {
+  def init(handler:UdpProtocolHandler)
+  def address(message:UdpMessage):AsciiBuffer
+  def decode_addresses(value:AsciiBuffer):Array[SimpleAddress]
+  def decode_delivery(message:UdpMessage):Delivery
+}
+
+class DefaultUdpDecoder extends UdpDecoder {
+
+  var topic_address:AsciiBuffer = _
+  var topic_address_decoded:Array[SimpleAddress] = _
+
+  def init(handler:UdpProtocolHandler) = {
+    val topic_name = Option(handler.config.topic).getOrElse("udp")
+    topic_address_decoded = LocalRouter.destination_parser.decode_multi_destination(topic_name,
(name)=> LocalRouter.destination_parser.decode_single_destination("topic:"+name, null))
+    topic_address = new AsciiBuffer(LocalRouter.destination_parser.encode_destination(topic_address_decoded))
+  }
+
+  def address(message: UdpMessage) = topic_address
+  def decode_addresses(value: AsciiBuffer) = topic_address_decoded
+  def decode_delivery(message: UdpMessage) = {
+    val delivery = new Delivery
+    delivery.size = message.buffer.remaining()
+    delivery.message = RawMessage(new Buffer(message.buffer))
+    delivery
+  }
+}
+
+object UdpProtocolHandler extends Log
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class UdpProtocolHandler extends ProtocolHandler {
+  import UdpProtocolHandler._
+
+  def protocol = UdpProtocol.id
+  def session_id = None
+
+  var decoder:UdpDecoder = _
+  var buffer_size = 0
+  var host:VirtualHost = _
+  var connection_log:Log = _
+  var config:UdpDTO = _
+
+  def broker = connection.connector.broker
+  def queue = connection.dispatch_queue
+
+  override def on_transport_connected = {
+    connection.transport.resumeRead
+    import collection.JavaConversions._
+
+    config = (connection.connector.config match {
+      case connector_config:AcceptingConnectorDTO =>
+        connector_config.protocols.flatMap{ _ match {
+          case x:UdpDTO => Some(x)
+          case _ => None
+        }}.headOption
+      case _ => None
+    }).getOrElse(new UdpDTO)
+
+    val decoder_name = Option(config.decoder).getOrElse(classOf[DefaultUdpDecoder].getName)
+    decoder = try {
+      this.getClass.getClassLoader.loadClass(decoder_name).newInstance().asInstanceOf[UdpDecoder]
+    } catch {
+      case x =>
+        warn(x)
+        connection.stop()
+        new DefaultUdpDecoder
+    }
+    buffer_size = MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
+    decoder.init(this)
+
+    broker.dispatch_queue {
+      var host = broker.get_default_virtual_host
+      queue {
+        this.host = host
+        connection_log = this.host.connection_log
+        connection.transport.resumeRead()
+        if(host==null) {
+          warn("Could not find default virtual host")
+          connection.stop()
+        }
+      }
+    }
+    
+  }
+
+  var producerRoutes = new LRUCache[AsciiBuffer, StompProducerRoute](1000) {
+    override def onCacheEviction(eldest: Entry[AsciiBuffer, StompProducerRoute]) = {
+      host.router.disconnect(eldest.getValue.addresses, eldest.getValue)
+    }
+  }
+
+  override def on_transport_command(command: AnyRef) = {
+    val msg = command.asInstanceOf[UdpMessage]
+    val address = decoder.address(msg)
+    var route = producerRoutes.get(address);
+    if( route == null ) {
+      route = new StompProducerRoute(address)
+      producerRoutes.put(address, route)
+      val security_context = new SecurityContext
+      security_context.connector_id = connection.connector.id
+      security_context.local_address = connection.transport.getLocalAddress
+      host.dispatch_queue {
+        val rc = host.router.connect(route.addresses, route, security_context)
+        if( rc.isDefined ) {
+
+        }
+      }
+    }
+    route.send(msg);
+  }
+
+  class StompProducerRoute(dest: AsciiBuffer) extends DeliveryProducerRoute(host.router)
{
+    val addresses = decoder.decode_addresses(dest)
+    val key = addresses.toList
+    
+    override def send_buffer_size = buffer_size
+    override def connection = Some(UdpProtocolHandler.this.connection)
+    override def dispatch_queue = queue
+
+    var inbound_queue_size = 0
+
+    val sink_switch = new MutableSink[Delivery]()
+
+    val inbound_queue = new OverflowSink[Delivery](sink_switch) {
+      override protected def onDelivered(value: Delivery) = {
+        inbound_queue_size -= value.size
+      }
+    }
+
+    override protected def on_connected = {
+      sink_switch.downstream = Some(this)
+    }
+
+    def send(frame:UdpMessage) = {
+      // Drop older entries to make room for this new one..
+      while( inbound_queue_size >= buffer_size ) {
+        inbound_queue.removeFirst
+      }
+      
+      val delivery = decoder.decode_delivery(frame)
+      inbound_queue_size += delivery.size
+      inbound_queue.offer(delivery)
+    }
+  }
+  
+}
+

Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/UdpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/UdpTransportFactory.java?rev=1295355&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/UdpTransportFactory.java
(added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/UdpTransportFactory.java
Thu Mar  1 00:24:41 2012
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.transport;
+
+import org.apache.activemq.apollo.util.IntrospectionSupport;
+import org.apache.activemq.apollo.util.URISupport;
+import org.fusesource.hawtdispatch.transport.Transport;
+import org.fusesource.hawtdispatch.transport.TransportServer;
+import org.fusesource.hawtdispatch.transport.UdpTransport;
+import org.fusesource.hawtdispatch.transport.UdpTransportServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class UdpTransportFactory implements TransportFactory.Provider {
+    private static final Logger LOG = LoggerFactory.getLogger(UdpTransportFactory.class);
+
+    public TransportServer bind(String location) throws Exception {
+
+        URI uri = new URI(location);
+        Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
+
+        UdpTransportServer server = createUdpTransportServer(uri, options);
+        if (server == null) return null;
+
+        Map<String, String> copy = new HashMap<String, String>(options);
+        IntrospectionSupport.setProperties(server, options);
+        return server;
+    }
+
+    public Transport connect(String location) throws Exception {
+        throw new UnsupportedOperationException() ;
+    }
+
+    protected UdpTransportServer createUdpTransportServer(final URI location, final Map<String,
String> options) throws IOException, URISyntaxException, Exception {
+        if( !location.getScheme().equals("udp") ) {
+            return null;
+        }
+
+        return new UdpTransportServer(location) {
+            @Override
+            protected UdpTransport createTransport() {
+                UdpTransport transport = super.createTransport();
+                IntrospectionSupport.setProperties(transport, options);
+                return transport;
+            }
+        };
+    }
+}

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/Module.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/Module.java?rev=1295355&r1=1295354&r2=1295355&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/Module.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/Module.java
Thu Mar  1 00:24:41 2012
@@ -36,7 +36,8 @@ public class Module implements DtoModule
                 QueueDestinationDTO.class,
                 NullStoreDTO.class,
                 SimpleStoreStatusDTO.class,
-                DetectDTO.class
+                DetectDTO.class,
+                UdpDTO.class
         };
     }
 }

Added: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java?rev=1295355&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java
(added)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java
Thu Mar  1 00:24:41 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * Configuration for the udp protocol.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name="udp")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class UdpDTO extends ProtocolDTO {
+
+    /**
+     * Class name of the decoder that will be used to interpret the
+     * UDP message
+     */
+    @XmlAttribute(name="decoder")
+    public String decoder;
+
+    @XmlAttribute(name="buffer_size")
+    public String buffer_size;
+
+    @XmlAttribute(name="topic")
+    public String topic;
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (!super.equals(o)) return false;
+
+        UdpDTO detectDTO = (UdpDTO) o;
+
+        if (decoder != null ? !decoder.equals(detectDTO.decoder) : detectDTO.decoder != null)
return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + (decoder != null ? decoder.hashCode() : 0);
+        return result;
+    }
+}

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index?rev=1295355&r1=1295354&r2=1295355&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
Thu Mar  1 00:24:41 2012
@@ -66,4 +66,5 @@ TopicStatusDTO
 ValueDTO
 VirtualHostDTO
 VirtualHostStatusDTO
-WebAdminDTO
\ No newline at end of file
+WebAdminDTO
+UdpDTO
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml?rev=1295355&r1=1295354&r2=1295355&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml Thu Mar
 1 00:24:41 2012
@@ -27,5 +27,6 @@
   </virtual_host>
 
   <connector id="tcp" bind="tcp://0.0.0.0:0"/>
+  <connector id="udp" bind="udp://0.0.0.0:0" protocol="udp"/>
 
 </broker>
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1295355&r1=1295354&r2=1295355&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Thu Mar  1 00:24:41 2012
@@ -19,7 +19,6 @@ package org.apache.activemq.apollo.stomp
 import org.scalatest.matchers.ShouldMatchers
 import org.scalatest.BeforeAndAfterEach
 import java.lang.String
-import java.net.InetSocketAddress
 import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.broker.{LocalRouter, KeyStorage, Broker, BrokerFactory}
 import java.util.concurrent.TimeUnit._
@@ -27,6 +26,9 @@ import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.dto.{QueueStatusDTO, TopicStatusDTO, KeyStorageDTO}
 import java.util.concurrent.atomic.AtomicLong
 import FileSupport._
+import java.net.{DatagramSocket, InetSocketAddress}
+import java.nio.channels.DatagramChannel
+import org.fusesource.hawtbuf.AsciiBuffer
 
 class StompTestSupport extends FunSuiteSupport with ShouldMatchers with BeforeAndAfterEach
with Logging {
   var broker: Broker = null
@@ -59,10 +61,12 @@ class StompTestSupport extends FunSuiteS
     clients = Nil
   }
 
+  def connector_port(connector:String):Option[Int] = Option(connector).map { id =>
+    broker.connectors.get(id).map(_.socket_address.asInstanceOf[InetSocketAddress].getPort).getOrElse(port)
+  }
+  
   def connect_request(version:String, c: StompClient, headers:String="", connector:String=null)
= {
-    val p = Option(connector).map{ id =>
-      broker.connectors.get(id).map(_.socket_address.asInstanceOf[InetSocketAddress].getPort).getOrElse(port)
-    }.getOrElse(port)
+    val p = connector_port(connector).getOrElse(port)
     c.open("localhost", p)
     version match {
       case "1.0"=>
@@ -2374,5 +2378,21 @@ class StompTempDestinationTest extends S
     frame should startWith("MESSAGE\n")
     frame should include("reply-to:sms:8139993334444\n")
   }
+}
+
+class StompUdpInteropTest extends StompTestSupport {
+
+  test("UDP to STOMP interop") {
+    
+    connect("1.1")
+    subscribe("0", "/topic/udp")
+
+    val udp_port:Int = connector_port("udp").get
+    val channel = DatagramChannel.open();
 
+    val target = new InetSocketAddress("127.0.0.1", udp_port)
+    channel.send(new AsciiBuffer("Hello").toByteBuffer, target)
+
+    assert_received("Hello")
+  }
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1295355&r1=1295354&r2=1295355&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Thu Mar  1 00:24:41 2012
@@ -96,7 +96,7 @@
     <xbean-version>3.4</xbean-version>
     <felix-version>1.0.0</felix-version>
 
-    <hawtdispatch-version>1.9</hawtdispatch-version>
+    <hawtdispatch-version>1.10-SNAPSHOT</hawtdispatch-version>
     <hawtbuf-version>1.9</hawtbuf-version>
     <stompjms-version>1.9</stompjms-version>
     



Mime
View raw message