activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject [09/14] git commit: Change the any protocol so that it swaps out the codec and handler when the AnyProtocolHandler receives the ProtocolDetected command.
Date Fri, 11 Oct 2013 19:14:13 GMT
Change the any protocol so that it swaps out the codec and handler when the AnyProtocolHandler
receives the ProtocolDetected command.

Also the any protocol does not pass the new codec down the the next protocol handler anymore
so don't try to handle that case anymore.

git-svn-id: https://svn.apache.org/repos/asf/activemq/activemq-apollo/trunk@1508929 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/activemq-apollo/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-apollo/commit/1a93a89a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-apollo/tree/1a93a89a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-apollo/diff/1a93a89a

Branch: refs/heads/trunk
Commit: 1a93a89a911f946e1d6f08e2166ad99baec4904d
Parents: 5a36e9e
Author: Hiram R. Chirino <chirino@apache.org>
Authored: Wed Jul 31 15:31:51 2013 +0000
Committer: Hiram R. Chirino <chirino@apache.org>
Committed: Wed Jul 31 15:31:51 2013 +0000

----------------------------------------------------------------------
 .../apollo/broker/protocol/AnyProtocol.scala    | 50 ++++++++++----------
 .../apollo/mqtt/MqttProtocolHandler.java        |  5 +-
 .../openwire/OpenwireProtocolHandler.scala      |  3 --
 .../apollo/stomp/StompProtocolHandler.scala     |  3 --
 4 files changed, 27 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/1a93a89a/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
----------------------------------------------------------------------
diff --git a/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
b/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
index 84e1e81..3e21c70 100644
--- a/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
+++ b/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
@@ -62,7 +62,7 @@ object AnyProtocol extends BaseProtocol {
 
 }
 
-case class ProtocolDetected(id:String, codec:ProtocolCodec)
+case class ProtocolDetected(id:String)
 
 class AnyProtocolCodec(val connector:Connector) extends ProtocolCodec {
 
@@ -82,18 +82,15 @@ class AnyProtocolCodec(val connector:Connector) extends ProtocolCodec
{
 
 
   def read: AnyRef = {
-    if (next != null) {
-      throw new IllegalStateException
+    if (next!=null) {
+      return next.read()
     }
 
     channel.read(buffer)
     val buff = new Buffer(buffer.array(), 0, buffer.position())
     protocols.foreach {protocol =>
       if (protocol.matchesIdentification(buff)) {
-        next = protocol.createProtocolCodec(connector)
-        AnyProtocol.change_protocol_codec(transport, next)
-        next.unread(buff.toByteArray)
-        return ProtocolDetected(protocol.id, next)
+        return ProtocolDetected(protocol.id)
       }
     }
     if (buffer.position() == buffer.capacity) {
@@ -141,27 +138,32 @@ class AnyProtocolHandler extends ProtocolHandler {
   def async_die(client_message:String) = connection.stop(NOOP)
 
   override def on_transport_command(command: AnyRef) = {
-  def async_die(client_message:String) = connection.stop(NOOP)
-
-    if (!command.isInstanceOf[ProtocolDetected]) {
-      throw new ProtocolException("Expected a ProtocolDetected object");
+    if( discriminated ) {
+      throw new ProtocolException("Protocol already discriminated");
     }
 
-    discriminated = true
-
-    var protocol: ProtocolDetected = command.asInstanceOf[ProtocolDetected];
-    val protocol_handler = ProtocolFactory.get(protocol.id) match {
-      case Some(x) => x.createProtocolHandler(connection.connector)
-      case None =>
-        throw new ProtocolException("No protocol handler available for protocol: " + protocol.id);
-    }
+    command match {
+      case detected:ProtocolDetected =>
+        discriminated = true
+        val protocol = ProtocolFactory.get(detected.id).getOrElse(throw new ProtocolException("No
protocol handler available for protocol: " + detected.id))
+        val protocol_handler = protocol.createProtocolHandler(connection.connector)
+
+        // Swap out the protocol codec
+        val any_codec = connection.protocol_codec(classOf[AnyProtocolCodec])
+        val next = protocol.createProtocolCodec(connection.connector)
+        AnyProtocol.change_protocol_codec(connection.transport, next)
+        val buff = new Buffer(any_codec.buffer.array(), 0, any_codec.buffer.position())
+        next.unread(buff.toByteArray)
 
-     // replace the current handler with the new one.
-    connection.protocol_handler = protocol_handler
-    connection.transport.suspendRead
+        // Swap out the protocol handler.
+        connection.protocol_handler = protocol_handler
+        connection.transport.suspendRead
+        protocol_handler.set_connection(connection);
+        connection.transport.getTransportListener.onTransportConnected()
 
-    protocol_handler.set_connection(connection);
-    connection.transport.getTransportListener.onTransportConnected()
+      case _ =>
+        throw new ProtocolException("Expected a ProtocolDetected object");
+    }
   }
 
   override def on_transport_connected = {

http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/1a93a89a/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java
----------------------------------------------------------------------
diff --git a/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java
b/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java
index 1f27f15..e9714be 100644
--- a/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java
+++ b/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java
@@ -402,10 +402,7 @@ public class MqttProtocolHandler extends AbstractProtocolHandler {
         return new UnitFn1<Object>() {
             @Override
             public void call(Object o) {
-                if (o instanceof MQTTProtocolCodec) {
-                    // this is passed on to us by the protocol discriminator
-                    // so we know which wire format is being used.
-                } else if (o instanceof MQTTFrame) {
+                if (o instanceof MQTTFrame) {
                     MQTTFrame command = (MQTTFrame) o;
                     try {
                         if (command.messageType() == CONNECT.TYPE) {

http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/1a93a89a/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
----------------------------------------------------------------------
diff --git a/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
b/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
index 52f85cc..cde434e 100644
--- a/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
+++ b/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
@@ -284,9 +284,6 @@ class OpenwireProtocolHandler extends ProtocolHandler {
       trace("received: %s", command)
       if (wire_format == null) {
         command match {
-          case codec: OpenwireCodec =>
-            // this is passed on to us by the protocol discriminator
-            // so we know which wire format is being used.
           case command: WireFormatInfo =>
             on_wire_format_info(command)
           case _ =>

http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/1a93a89a/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
----------------------------------------------------------------------
diff --git a/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
b/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
index ef33a8c..7854823 100644
--- a/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
+++ b/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
@@ -989,9 +989,6 @@ class StompProtocolHandler extends ProtocolHandler {
     }
     try {
       command match {
-        case s:StompCodec =>
-          // this is passed on to us by the protocol discriminator
-          // so we know which wire format is being used.
         case f:StompFrame=>
 
           trace("received frame: %s", f)


Mime
View raw message