activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1461905 [1/2] - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ apollo-mqtt/src/main/scala/org/apache/activemq/apo...
Date Wed, 27 Mar 2013 23:58:59 GMT
Author: chirino
Date: Wed Mar 27 23:58:58 2013
New Revision: 1461905

URL: http://svn.apache.org/r1461905
Log:
Porting more logic from scala to java.

Added:
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSessionManager.java
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/Request.java
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/SessionDeliverySizer.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2Java.java
      - copied, changed from r1461839, activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2JavaHelper.scala
      - copied, changed from r1461839, activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala
Removed:
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1461905&r1=1461904&r2=1461905&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Wed Mar 27 23:58:58 2013
@@ -107,6 +107,19 @@ trait SinkMapper[T,X] extends Sink[T] wi
   def passing(value:T):X
 }
 
+abstract class AbstractSinkMapper[T,X] extends Sink[T] with SinkFilter[X] {
+  def offer(value:T) = {
+    if( full ) {
+      false
+    } else {
+      val accepted:Boolean = downstream.offer(passing(value))
+      assert(accepted, "The downstream sink violated it's contract, an offer was not accepted but it had told us it was not full")
+      accepted
+    }
+  }
+  def passing(value:T):X
+}
+
 /**
  * <p>
  * A delivery sink which is connected to a transport. It expects the caller's dispatch

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala?rev=1461905&r1=1461904&r2=1461905&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala Wed Mar 27 23:58:58 2013
@@ -107,6 +107,8 @@ trait ProtocolHandler {
 
 }
 
+abstract class AbstractProtocolHandler extends ProtocolHandler
+
 @deprecated(message="Please use the ProtocolFilter2 interface instead", since="1.3")
 trait ProtocolFilter {
   def filter[T](command: T):T
@@ -161,7 +163,11 @@ object SimpleProtocolFilter2Factory exte
 }
 
 object ProtocolFilter2 {
-  def create_filters(dtos:List[ProtocolFilterDTO], handler:ProtocolHandler) = {
+  def create_filters(dtos:java.util.List[ProtocolFilterDTO], handler:ProtocolHandler):java.util.List[ProtocolFilter2] = {
+    import collection.JavaConversions._
+    collection.JavaConversions.seqAsJavaList(create_filters(dtos.toList, handler));
+  }
+  def create_filters(dtos:List[ProtocolFilterDTO], handler:ProtocolHandler):List[ProtocolFilter2] = {
     dtos.map(ProtocolFilter2Factory.create(_, handler))
   }
 }

Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java?rev=1461905&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java Wed Mar 27 23:58:58 2013
@@ -0,0 +1,526 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt;
+
+import org.apache.activemq.apollo.broker.*;
+import org.apache.activemq.apollo.broker.protocol.AbstractProtocolHandler;
+import org.apache.activemq.apollo.broker.protocol.ProtocolFilter2;
+import org.apache.activemq.apollo.broker.protocol.ProtocolFilter2$;
+import org.apache.activemq.apollo.broker.security.SecurityContext;
+import org.apache.activemq.apollo.dto.AcceptingConnectorDTO;
+import org.apache.activemq.apollo.dto.ProtocolDTO;
+import org.apache.activemq.apollo.mqtt.dto.MqttConnectionStatusDTO;
+import org.apache.activemq.apollo.mqtt.dto.MqttDTO;
+import org.apache.activemq.apollo.util.*;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+import org.fusesource.hawtdispatch.transport.HeartBeatMonitor;
+import org.fusesource.mqtt.codec.*;
+import scala.Option;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import static org.apache.activemq.apollo.util.Scala2Java.*;
+import static org.fusesource.hawtdispatch.Dispatch.NOOP;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class MqttProtocolHandler extends AbstractProtocolHandler {
+
+    public static final Scala2Java.Logger log = new Scala2Java.Logger(Log$.MODULE$.apply(MqttProtocolHandler.class));
+
+    public static <T> T received(T value) {
+        log.trace("received: %s", value);
+        return value;
+    }
+
+    static Fn0<String> WAITING_ON_CLIENT_REQUEST = new Fn0<String>() {
+        @Override
+        public String apply() {
+            return "client request";
+        }
+    };
+
+    public String protocol() {
+        return "mqtt";
+    }
+
+    public Broker broker() {
+        return connection().connector().broker();
+    }
+
+    public DispatchQueue queue() {
+        return connection().dispatch_queue();
+    }
+
+    Scala2Java.Logger connection_log = log;
+    MqttDTO config = null;
+
+    public DestinationParser destination_parser() {
+        DestinationParser destination_parser = MqttProtocol.destination_parser;
+        if (config.queue_prefix != null ||
+                config.path_separator != null ||
+                config.any_child_wildcard != null ||
+                config.any_descendant_wildcard != null ||
+                config.regex_wildcard_start != null ||
+                config.regex_wildcard_end != null ||
+                config.part_pattern != null
+                ) {
+            destination_parser = new DestinationParser().copy(destination_parser);
+            if (config.queue_prefix != null) {
+                destination_parser.queue_prefix_$eq(config.queue_prefix);
+            }
+            if (config.path_separator != null) {
+                destination_parser.path_separator_$eq(config.path_separator);
+            }
+            if (config.any_child_wildcard != null) {
+                destination_parser.any_child_wildcard_$eq(config.any_child_wildcard);
+            }
+            if (config.any_descendant_wildcard != null) {
+                destination_parser.any_descendant_wildcard_$eq(config.any_descendant_wildcard);
+            }
+            if (config.regex_wildcard_start != null) {
+                destination_parser.regex_wildcard_start_$eq(config.regex_wildcard_start);
+            }
+            if (config.regex_wildcard_end != null) {
+                destination_parser.regex_wildcard_end_$eq(config.regex_wildcard_end);
+            }
+            if (config.part_pattern != null) {
+                destination_parser.part_pattern_$eq(Pattern.compile(config.part_pattern));
+            }
+        }
+        return destination_parser;
+    }
+
+    final ArrayList<ProtocolFilter2> protocol_filters = new ArrayList<ProtocolFilter2>();
+
+    /////////////////////////////////////////////////////////////////////
+    //
+    // Bits related setting up a client connection
+    //
+    /////////////////////////////////////////////////////////////////////
+    public Option<String> session_id() {
+        return security_context.session_id();
+    }
+
+    final SecurityContext security_context = new SecurityContext();
+    SinkMux<Request> sink_manager = null;
+    Sink<Request> connection_sink = null;
+    MQTTProtocolCodec codec = null;
+
+
+    static private MqttDTO find_config(AcceptingConnectorDTO connector_config) {
+        for (ProtocolDTO protocol : connector_config.protocols) {
+            if (protocol instanceof MqttDTO) {
+                return (MqttDTO) protocol;
+            }
+        }
+        return new MqttDTO();
+    }
+
+    public void on_transport_connected() {
+
+        codec = (MQTTProtocolCodec) connection().transport().getProtocolCodec();
+        AcceptingConnectorDTO connector_config = (AcceptingConnectorDTO) connection().connector().config();
+        config = find_config(connector_config);
+
+        codec.setMaxMessageLength(get(config.max_message_length, codec.getMaxMessageLength()));
+
+        protocol_filters.clear();
+        protocol_filters.addAll(ProtocolFilter2$.MODULE$.create_filters(config.protocol_filters, this));
+
+        security_context.local_address_$eq(connection().transport().getLocalAddress());
+        security_context.remote_address_$eq(connection().transport().getRemoteAddress());
+        security_context.connector_id_$eq(connection().connector().id());
+        security_context.certificates_$eq(connection().certificates());
+
+        connection_log = new Scala2Java.Logger(connection().connector().broker().connection_log());
+
+        Sink<Request> filtering_sink = new AbstractSinkMapper<Request, Object>() {
+            public Sink<Object> downstream() {
+                return (Sink<Object>) connection().transport_sink();
+            }
+
+            public MQTTFrame passing(Request request) {
+                log.trace("sent: %s", request.message);
+                request.delivered = true;
+                if (request.id == 0 && request.ack != null) {
+                    request.ack.apply(Consumed$.MODULE$);
+                }
+                return request.frame;
+
+            }
+        };
+
+        if (!protocol_filters.isEmpty()) {
+            filtering_sink = filtering_sink.flatMap(toScala(new Fn1<Request, Option<Request>>() {
+                @Override
+                public Option<Request> apply(Request x) {
+                    Option<Request> cur = some(x);
+                    for (ProtocolFilter2 filter : protocol_filters) {
+                        if (cur.isDefined()) {
+                            cur = filter.filter_outbound(cur.get());
+                        }
+                    }
+                    return cur;
+                }
+            }));
+        }
+        sink_manager = new SinkMux<Request>(filtering_sink);
+        connection_sink = new OverflowSink(sink_manager.open());
+        resume_read();
+    }
+
+
+    /////////////////////////////////////////////////////////////////////
+    //
+    // Bits related tearing down a client connection
+    //
+    /////////////////////////////////////////////////////////////////////
+    boolean closed = false;
+    public static UnitFn1<Object> dead_handler = new UnitFn1<Object>() {
+        @Override
+        public void call(Object v1) {
+        }
+    };
+
+    public void on_transport_disconnected() {
+        if (!closed) {
+            closed = true;
+            dead = true;
+            command_handler = dead_handler;
+
+            security_context.logout(toScala(new UnitFn1<Throwable>() {
+                @Override
+                public void call(Throwable e) {
+                    if (e != null) {
+                        connection_log.info(e, "MQTT connection '%s' log out error: %s", security_context.remote_address(), e.toString());
+                    }
+                }
+            }));
+
+            heart_beat_monitor.stop();
+            if (!connection().stopped()) {
+                connection().stop(NOOP);
+            }
+            log.trace("mqtt protocol resources released");
+        }
+    }
+
+    public void on_transport_failure(IOException error) {
+        if (!dead) {
+            command_handler.apply("failure");
+            dead = true;
+            command_handler = dead_handler;
+            if (!connection().stopped()) {
+                connection_log.info(error, "Shutting connection '%s'  down due to: %s", security_context.remote_address(), error);
+                super.on_transport_failure(error);
+            }
+        }
+    }
+
+    /////////////////////////////////////////////////////////////////////
+    //
+    // Bits related managing connection flow control
+    //
+    /////////////////////////////////////////////////////////////////////
+
+    Fn0<String> status = WAITING_ON_CLIENT_REQUEST;
+
+    public void _suspend_read(final String reason) {
+        suspend_read(new Fn0<String>() {
+            @Override
+            public String apply() {
+                return reason;
+            }
+        });
+    }
+
+    public void suspend_read(Fn0<String> reason) {
+        status = reason;
+        connection().transport().suspendRead();
+        heart_beat_monitor.suspendRead();
+    }
+
+    public void resume_read() {
+        status = WAITING_ON_CLIENT_REQUEST;
+        connection().transport().resumeRead();
+        heart_beat_monitor.resumeRead();
+    }
+
+    /////////////////////////////////////////////////////////////////////
+    //
+    // Bits related to raising connection failure signals
+    //
+    /////////////////////////////////////////////////////////////////////
+
+    boolean dead = false;
+
+    public long die_delay() {
+        return get(config.die_delay, 1000 * 5L);
+    }
+
+    class Break extends RuntimeException {
+    }
+
+    public void async_die(String msg) {
+        async_die(msg, null);
+    }
+
+    public void async_die(String msg, Throwable e) {
+        try {
+            die(msg, e);
+        } catch (Break x) {
+        }
+    }
+
+    public void async_die(MessageSupport.Message response, String msg) {
+        try {
+            die(response, msg, null);
+        } catch (Break x) {
+        }
+    }
+
+    public <T> T die(String msg) {
+        return die(null, msg, null);
+    }
+
+    public <T> T die(String msg, Throwable e) {
+        return die(null, msg, e);
+    }
+
+    public <T> T die(MessageSupport.Message response, String msg) {
+        return die(response, msg, null);
+    }
+
+    public <T> T die(MessageSupport.Message response, String msg, Throwable e) {
+        if (e != null) {
+            connection_log.info(e, "MQTT connection '%s' error: %s", security_context.remote_address(), msg, e);
+        } else {
+            connection_log.info("MQTT connection '%s' error: %s", security_context.remote_address(), msg);
+        }
+        return die(response);
+    }
+
+    public <T> T die(MessageSupport.Message response) {
+        if (!dead) {
+            command_handler.apply("failure");
+            dead = true;
+            command_handler = dead_handler;
+            status = new Fn0<String>() {
+                @Override
+                public String apply() {
+                    return "shuting down";
+                }
+            };
+            if (response != null) {
+                connection().transport().resumeRead();
+                connection_sink.offer(new Request((short) 0, response, null));
+                // TODO: if there are too many open connections we should just close the connection
+                // without waiting for the error to get sent to the client.
+                queue().executeAfter(die_delay(), TimeUnit.MILLISECONDS, new Task() {
+                    @Override
+                    public void run() {
+                        connection().stop(NOOP);
+                    }
+                });
+            } else {
+                connection().stop(NOOP);
+            }
+        }
+        throw new Break();
+    }
+
+    /////////////////////////////////////////////////////////////////////
+    //
+    // Bits for dispatching client requests.
+    //
+    /////////////////////////////////////////////////////////////////////
+    UnitFn1<Object> command_handler = connect_handler();
+
+    public void on_transport_command(Object command) {
+        try {
+            if (!protocol_filters.isEmpty()) {
+                for (ProtocolFilter2 filter : protocol_filters) {
+                    Option<Object> opt = filter.filter_inbound(command);
+                    if (opt.isDefined()) {
+                        command = opt.get();
+                    } else {
+                        return; // dropping the frame.
+                    }
+                }
+            }
+            command_handler.apply(command);
+        } catch (Break e) {
+        } catch (Exception e) {
+            // To avoid double logging to the same log category..
+            String msg = "Internal Server Error: " + e;
+            if (connection_log != MqttProtocolHandler.log) {
+                // but we also want the error on the apollo.log file.
+                MqttProtocolHandler.log.warn(e, msg);
+            }
+            async_die(msg, e);
+        }
+    }
+
+
+    /////////////////////////////////////////////////////////////////////
+    //
+    // Bits related establishing the client connection
+    //
+    /////////////////////////////////////////////////////////////////////
+
+    CONNECT connect_message = null;
+    HeartBeatMonitor heart_beat_monitor = new HeartBeatMonitor();
+    VirtualHost host = null;
+
+    public UnitFn1<Object> connect_handler() {
+        return new UnitFn1<Object>() {
+            @Override
+            public void call(Object o) {
+                if (o instanceof MQTTProtocolCodec) {
+                    // this is passed on to us by the protocol discriminator
+                    // so we know which wire format is being used.
+                } else if (o instanceof MQTTFrame) {
+                    MQTTFrame command = (MQTTFrame) o;
+                    try {
+                        if (command.messageType() == CONNECT.TYPE) {
+                            connect_message = received(new CONNECT().decode(command));
+                            on_mqtt_connect();
+                        } else {
+                            die("Expecting an MQTT CONNECT message, but got: " + command.getClass());
+                        }
+                    } catch (java.net.ProtocolException e) {
+                        die("Internal Server Error: bad mqtt command: " + command);
+                    }
+                } else if ("failure".equals(o)) {
+                } else {
+                    die("Internal Server Error: unexpected mqtt command: " + o.getClass());
+                }
+            }
+        };
+    }
+
+    public void on_mqtt_connect() {
+
+        final CONNACK connack = new CONNACK();
+
+        if (connect_message.version() != 3) {
+            connack.code(CONNACK.Code.CONNECTION_REFUSED_UNACCEPTED_PROTOCOL_VERSION);
+            die(connack, "Unsupported protocol version: " + connect_message.version());
+        }
+
+        UTF8Buffer client_id = connect_message.clientId();
+        security_context.user_$eq(Scala2Java.toString(connect_message.userName()));
+        security_context.password_$eq(Scala2Java.toString(connect_message.password()));
+        security_context.session_id_$eq(Scala2Java.some(client_id.toString()));
+
+        final short keep_alive = connect_message.keepAlive();
+        if (keep_alive > 0) {
+            heart_beat_monitor.setReadInterval(((long) (keep_alive * 1.5)) * 1000);
+            heart_beat_monitor.setOnDead(new Task() {
+                @Override
+                public void run() {
+                    async_die("Missed keep alive set to " + keep_alive + " seconds");
+                }
+            });
+        }
+        heart_beat_monitor.suspendRead();
+        heart_beat_monitor.setTransport(connection().transport());
+        heart_beat_monitor.start();
+
+        _suspend_read("virtual host lookup");
+        broker().dispatch_queue().execute(new Task() {
+            @Override
+            public void run() {
+                host = connection().connector().broker().get_default_virtual_host();
+                queue().execute(new Task() {
+                    @Override
+                    public void run() {
+                        resume_read();
+                        if (host == null) {
+                            connack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
+                            async_die(connack, "Default virtual host not found.");
+                        } else if (!host.service_state().is_started()) {
+                            connack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
+                            async_die(connack, "Default virtual host stopped.");
+                        } else {
+                            connection_log = new Scala2Java.Logger(host.connection_log());
+                            if (host.authenticator() != null && host.authorizer() != null) {
+                                _suspend_read("authenticating and authorizing connect");
+                                host.authenticator().authenticate(security_context, toScala(new UnitFn1<String>() {
+                                    public void call(final String auth_err) {
+                                        queue().execute(new Task() {
+                                            @Override
+                                            public void run() {
+                                                if (auth_err != null) {
+                                                    connack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
+                                                    async_die(connack, auth_err + ". Credentials=" + security_context.credential_dump());
+                                                } else if (!host.authorizer().can(security_context, "connect", connection().connector())) {
+                                                    connack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED);
+                                                    async_die(connack, String.format("Not authorized to connect to connector '%s'. Principals=", connection().connector().id(), security_context.principal_dump()));
+                                                } else if (!host.authorizer().can(security_context, "connect", host)) {
+                                                    connack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED);
+                                                    async_die(connack, String.format("Not authorized to connect to virtual host '%s'. Principals=", host.id(), security_context.principal_dump()));
+                                                } else {
+                                                    resume_read();
+                                                    on_host_connected(host);
+                                                }
+                                            }
+                                        });
+                                    }
+                                }));
+                            } else {
+                                on_host_connected(host);
+                            }
+                        }
+                    }
+                });
+            }
+        });
+    }
+
+    public void on_host_connected(VirtualHost host) {
+        MqttSessionManager.attach(host, connect_message.clientId(), this);
+    }
+
+    /////////////////////////////////////////////////////////////////////
+    //
+    // Other msic bits.
+    //
+    /////////////////////////////////////////////////////////////////////
+    LongCounter messages_sent = new LongCounter(0);
+    LongCounter messages_received = new LongCounter(0);
+    int subscription_count = 0;
+
+    public MqttConnectionStatusDTO create_connection_status() {
+        MqttConnectionStatusDTO rc = new MqttConnectionStatusDTO();
+        rc.protocol_version = "3.1";
+        rc.messages_sent = messages_sent.get();
+        rc.messages_received = messages_received.get();
+        rc.subscription_count = subscription_count;
+        rc.waiting_on = status.apply();
+        return rc;
+    }
+
+
+}

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala?rev=1461905&r1=1461904&r2=1461905&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala Wed Mar 27 23:58:58 2013
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.apollo.mqtt
 
+
 import org.fusesource.hawtbuf._
 import dto.{MqttConnectionStatusDTO, MqttDTO}
 import org.fusesource.hawtdispatch._
@@ -46,1266 +47,4 @@ import scala.Some
 import org.apache.activemq.apollo.mqtt.MqttSessionManager.HostState
 import org.apache.activemq.apollo.broker.SubscriptionAddress
 
-case class Request(id:Short, message:MessageSupport.Message, ack:UnitFn1[DeliveryResult]) {
-  val frame = if(message==null) null else message.encode()
-  var delivered = false
-}
-
-object SessionDeliverySizer extends Sizer[(Session[Delivery], Delivery)] {
-  def size(value: (Session[Delivery], Delivery)) = Delivery.size(value._2)
-}
-
-object MqttProtocolHandler extends Log {
-  
-
-  def received[T](value:T):T = {
-    trace("received: %s", value)
-    value
-  }
-
-  val WAITING_ON_CLIENT_REQUEST = ()=> "client request"
-
-}
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class MqttProtocolHandler extends ProtocolHandler {
-  import MqttProtocolHandler._
-
-  def protocol = "mqtt"
-
-  def broker = connection.connector.broker
-  def queue = connection.dispatch_queue
-
-  var connection_log:Log = MqttProtocolHandler
-  var config:MqttDTO = _
-
-  def destination_parser = {
-    var destination_parser = MqttProtocol.destination_parser
-    if( config.queue_prefix!=null ||
-        config.path_separator!= null ||
-        config.any_child_wildcard != null ||
-        config.any_descendant_wildcard!= null ||
-        config.regex_wildcard_start!= null ||
-        config.regex_wildcard_end!= null ||
-        config.part_pattern!= null
-    ) {
-      destination_parser = new DestinationParser().copy(destination_parser)
-      if( config.queue_prefix!=null ) { destination_parser.queue_prefix = config.queue_prefix }
-      if( config.path_separator!=null ) { destination_parser.path_separator = config.path_separator }
-      if( config.any_child_wildcard!=null ) { destination_parser.any_child_wildcard = config.any_child_wildcard }
-      if( config.any_descendant_wildcard!=null ) { destination_parser.any_descendant_wildcard = config.any_descendant_wildcard }
-      if( config.regex_wildcard_start!=null ) { destination_parser.regex_wildcard_start = config.regex_wildcard_start }
-      if( config.regex_wildcard_end!=null ) { destination_parser.regex_wildcard_end = config.regex_wildcard_end }
-      if( config.part_pattern!=null ) { destination_parser.part_pattern = Pattern.compile(config.part_pattern) }
-    }
-    destination_parser
-  }
-
-  var protocol_filters = List[ProtocolFilter2]()
-
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Bits related setting up a client connection
-  //
-  /////////////////////////////////////////////////////////////////////
-  def session_id = security_context.session_id
-
-  val security_context = new SecurityContext
-  var sink_manager:SinkMux[Request] = null
-  var connection_sink:Sink[Request] = null
-  var codec:MQTTProtocolCodec = _
-
-  override def on_transport_connected() = {
-    import collection.JavaConversions._
-
-    codec = connection.transport.getProtocolCodec.asInstanceOf[MQTTProtocolCodec]
-    val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
-    config = connector_config.protocols.find( _.isInstanceOf[MqttDTO]).map(_.asInstanceOf[MqttDTO]).getOrElse(new MqttDTO)
-    import OptionSupport._
-    config.max_message_length.foreach( codec.setMaxMessageLength(_) )
-
-    import collection.JavaConversions._
-    protocol_filters = ProtocolFilter2.create_filters(config.protocol_filters.toList, this)
-
-    security_context.local_address = connection.transport.getLocalAddress
-    security_context.remote_address = connection.transport.getRemoteAddress
-    security_context.connector_id = connection.connector.id
-    security_context.certificates = connection.certificates
-
-    connection_log = connection.connector.broker.connection_log
-    var filtering_sink:Sink[Request] = connection.transport_sink.map { request =>
-      trace("sent: %s", request.message)
-      request.delivered = true
-      if (request.id == 0 && request.ack != null) {
-        request.ack(Consumed)
-      }
-      request.frame
-    }
-    if(!protocol_filters.isEmpty) {
-      filtering_sink = filtering_sink.flatMap {x=>
-        var cur = Option(x)
-        protocol_filters.foreach { filter =>
-          cur = cur.flatMap(filter.filter_outbound(_))
-        }
-        cur
-      }
-    }
-    sink_manager = new SinkMux[Request](filtering_sink)
-    connection_sink = new OverflowSink(sink_manager.open());
-    resume_read
-  }
-
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Bits related tearing down a client connection
-  //
-  /////////////////////////////////////////////////////////////////////
-  var closed = false
-  def dead_handler(command:AnyRef):Unit = {}
-
-  override def on_transport_disconnected() = {
-    if( !closed ) {
-      closed=true;
-      dead = true;
-      command_handler = dead_handler _
-
-      security_context.logout( e => {
-        if(e!=null) {
-          connection_log.info(e, "MQTT connection '%s' log out error: %s", security_context.remote_address, e.toString)
-        }
-      })
-
-      heart_beat_monitor.stop
-      if( !connection.stopped ) {
-        connection.stop(NOOP)
-      }
-      trace("mqtt protocol resources released")
-    }
-  }
-
-  override def on_transport_failure(error: IOException) = {
-    if( !dead ) {
-      command_handler("failure")
-      dead = true
-      command_handler = dead_handler _
-      if( !connection.stopped ) {
-        connection_log.info(error, "Shutting connection '%s'  down due to: %s", security_context.remote_address, error)
-        super.on_transport_failure(error);
-      }
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Bits related managing connection flow control
-  //
-  /////////////////////////////////////////////////////////////////////
-
-  var status = WAITING_ON_CLIENT_REQUEST
-  def _suspend_read(reason: String) = suspend_read(reason)
-  def suspend_read(reason: => String) = {
-    status = reason _
-    connection.transport.suspendRead
-    heart_beat_monitor.suspendRead
-  }
-
-  def resume_read() = {
-    status = WAITING_ON_CLIENT_REQUEST
-    connection.transport.resumeRead
-    heart_beat_monitor.resumeRead
-  }
-
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Bits related to raising connection failure signals
-  //
-  /////////////////////////////////////////////////////////////////////
-
-  var dead = false
-  def die_delay = {
-    import OptionSupport._
-    config.die_delay.getOrElse(1000*5L)
-  }
-
-  class Break extends RuntimeException
-
-  def async_die(msg:String, e:Throwable=null):Unit = try {
-    die(msg, e)
-  } catch {
-    case x:Break=>
-  }
-
-  def async_die(response:MessageSupport.Message, msg:String):Unit = try {
-    die(response, msg, null)
-  } catch {
-    case x:Break=>
-  }
-
-  def die[T](msg:String):T = die(null, msg, null)
-  def die[T](msg:String, e:Throwable):T = die(null, msg, e)
-  def die[T](response:MessageSupport.Message, msg:String):T = die(response, msg, null)
-  def die[T](response:MessageSupport.Message, msg:String, e:Throwable):T = {
-    if( e!=null) {
-      connection_log.info(e, "MQTT connection '%s' error: %s", security_context.remote_address, msg, e)
-    } else {
-      connection_log.info("MQTT connection '%s' error: %s", security_context.remote_address, msg)
-    }
-    die(response)
-  }
-
-  def die[T](response:MessageSupport.Message):T = {
-    if( !dead ) {
-      command_handler("failure")
-      dead = true
-      command_handler = dead_handler _
-      status = ()=>"shuting down"
-      if( response!=null ) {
-        connection.transport.resumeRead
-        connection_sink.offer(Request(0, response, null))
-        // TODO: if there are too many open connections we should just close the connection
-        // without waiting for the error to get sent to the client.
-        queue.after(die_delay, TimeUnit.MILLISECONDS) {
-          connection.stop(NOOP)
-        }
-      } else {
-        connection.stop(NOOP)
-      }
-    }
-    throw new Break()
-  }
-
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Bits for dispatching client requests.
-  //
-  /////////////////////////////////////////////////////////////////////
-  var command_handler: (AnyRef)=>Unit = connect_handler _
-
-  override def on_transport_command(command:AnyRef):Unit = {
-    try {
-
-      var f = command
-      val frame = if(!protocol_filters.isEmpty) {
-        var cur = Option(f)
-        protocol_filters.foreach { filter =>
-          cur = cur.flatMap(filter.filter_inbound(_))
-        }
-        cur match {
-          case Some(f) => f
-          case None => return // dropping the frame.
-        }
-      } else {
-        f
-      }
-
-      command_handler(frame)
-    }  catch {
-      case e: Break =>
-      case e:Exception =>
-        // To avoid double logging to the same log category..
-        var msg: String = "Internal Server Error: " + e
-        if( connection_log!=MqttProtocolHandler ) {
-          // but we also want the error on the apollo.log file.
-          warn(e, msg)
-        }
-        async_die(msg, e);
-    }
-  }
-
-  
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Bits related establishing the client connection
-  //
-  /////////////////////////////////////////////////////////////////////
-  
-  var connect_message:CONNECT = _
-  var heart_beat_monitor = new HeartBeatMonitor
-  var host:VirtualHost = _
-
-  def connect_handler(command:AnyRef):Unit = command match {
-    case s:MQTTProtocolCodec =>
-      // this is passed on to us by the protocol discriminator
-      // so we know which wire format is being used.
-    case command:MQTTFrame=>
-
-      command.messageType() match {
-        case CONNECT.TYPE =>
-          connect_message = received(new CONNECT().decode(command)) 
-          on_mqtt_connect
-        case _ =>
-          die("Expecting an MQTT CONNECT message, but got: "+command.getClass);
-      }
-    case "failure" =>
-    case _=>
-      die("Internal Server Error: unexpected mqtt command: "+command.getClass);
-  }
-
-  def on_mqtt_connect:Unit = {
-    
-    val connack = new CONNACK
-
-    if(connect_message.version!=3) {
-      connack.code(CONNECTION_REFUSED_UNACCEPTED_PROTOCOL_VERSION)
-      die(connack, "Unsupported protocol version: "+connect_message.version)
-    }
-    
-    val client_id = connect_message.clientId()
-    security_context.user = Option(connect_message.userName).map(_.toString).getOrElse(null)
-    security_context.password = Option(connect_message.password).map(_.toString).getOrElse(null)
-    security_context.session_id = Some(client_id.toString)
-
-    val keep_alive = connect_message.keepAlive
-    if( keep_alive > 0 ) {
-      heart_beat_monitor.setReadInterval((keep_alive*1.5).toLong*1000)
-      heart_beat_monitor.setOnDead(^{
-        async_die("Missed keep alive set to "+keep_alive+" seconds")
-      });
-    }
-    heart_beat_monitor.suspendRead()
-    heart_beat_monitor.setTransport(connection.transport)
-    heart_beat_monitor.start
-
-    suspend_read("virtual host lookup")
-    broker.dispatch_queue {
-      host = connection.connector.broker.get_default_virtual_host
-      queue {
-        resume_read
-        if(host==null) {
-          connack.code(CONNECTION_REFUSED_SERVER_UNAVAILABLE)
-          async_die(connack, "Default virtual host not found.")
-        } else if(!host.service_state.is_started) {
-          connack.code(CONNECTION_REFUSED_SERVER_UNAVAILABLE)
-          async_die(connack, "Default virtual host stopped.")
-        } else {
-          connection_log = host.connection_log
-          if( host.authenticator!=null &&  host.authorizer!=null ) {
-            suspend_read("authenticating and authorizing connect")
-            host.authenticator.authenticate(security_context) { auth_err =>
-              queue {
-                if( auth_err!=null ) {
-                  connack.code(CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD)
-                  async_die(connack, auth_err+". Credentials="+security_context.credential_dump)
-                } else if( !host.authorizer.can(security_context, "connect", connection.connector) ) {
-                  connack.code(CONNECTION_REFUSED_NOT_AUTHORIZED)
-                  async_die(connack, "Not authorized to connect to connector '%s'. Principals=".format(connection.connector.id, security_context.principal_dump))
-                } else if( !host.authorizer.can(security_context, "connect", host) ) {
-                  connack.code(CONNECTION_REFUSED_NOT_AUTHORIZED)
-                  async_die(connack, "Not authorized to connect to virtual host '%s'. Principals=".format(host.id, security_context.principal_dump))
-                } else {
-                  resume_read
-                  on_host_connected(host)
-                }
-              }
-            }
-          } else {
-            on_host_connected(host)
-          }
-        }
-      }
-    }
-  }
-  
-  def on_host_connected(host:VirtualHost):Unit = {
-    MqttSessionManager.attach(host, connect_message.clientId(), this)
-  }
-
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Other msic bits.
-  //
-  /////////////////////////////////////////////////////////////////////
-  val messages_sent = new LongCounter()
-  var messages_received = new LongCounter()
-  var subscription_count = 0
-
-  override def create_connection_status = {
-    var rc = new MqttConnectionStatusDTO
-    rc.protocol_version = "3.1"
-    rc.messages_sent = messages_sent.get()
-    rc.messages_received = messages_received.get
-    rc.subscription_count = subscription_count
-    rc.waiting_on = status()
-    rc
-  }
-
-}
-
-
-/**
- * Tracks active sessions so that we can ensure that a given
- * session id is only associated with once connection
- * at a time.  If a client tries to establish a 2nd
- * connection, the first one will be closed before the session
- * is switch to the new connection.
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object MqttSessionManager {
-
-  val queue = createQueue("session manager")
-
-  class SessionState {
-    var durable_sub:SubscriptionAddress = _
-    val subscriptions = new java.util.HashMap[UTF8Buffer, (Topic, BindAddress)]()
-    val received_message_ids: HashSet[Short] = new HashSet[Short]
-
-    trait StorageStrategy {
-      def update(cb: Task)
-      def destroy(cb: Task)
-      def create(store:Store, client_id:UTF8Buffer)
-    }
-    case class NoopStrategy() extends StorageStrategy {
-      def update(cb: Task) = { cb.run() }
-      def destroy(cb: Task) { cb.run() }
-      def create(store:Store, client_id:UTF8Buffer) = {
-        if(store!=null)
-          strategy = StoreStrategy(store, client_id)
-      }
-    }
-
-    case class StoreStrategy(store:Store, client_id:UTF8Buffer) extends StorageStrategy {
-      val session_key = new UTF8Buffer("mqtt:"+client_id)
-      def update(cb: Task) = {
-        val uow = store.create_uow
-        val session_pb = new SessionPB.Bean
-        session_pb.setClientId(client_id)
-        received_message_ids.foreach(session_pb.addReceivedMessageIds(_))
-
-        import collection.JavaConversions._
-        for( (topic, address)  <- subscriptions.values) {
-          val topic_pb = new TopicPB.Bean
-          topic_pb.setName(topic.name())
-          topic_pb.setQos(topic.qos().ordinal())
-          topic_pb.setAddress(new UTF8Buffer(address.toString))
-          session_pb.addSubscriptions(topic_pb)
-        }
-        uow.put(session_key, session_pb.freeze().toUnframedBuffer)
-
-        val current = getCurrentQueue
-        uow.on_complete {
-          current {
-            cb.run()
-          }
-        }
-        uow.release
-      }
-
-      def destroy(cb: Task) {
-        val uow = store.create_uow
-        uow.put(session_key, null)
-        val current = getCurrentQueue
-        uow.on_complete {
-          current {
-            strategy = NoopStrategy()
-            cb.run()
-          }
-        }
-        uow.release
-      }
-      def create(store:Store, client_id:UTF8Buffer) = {
-      }
-    }
-    var strategy:StorageStrategy = new NoopStrategy
-
-  }
-
-
-  case class HostState(host:VirtualHost) {
-    val session_states = HashMap[UTF8Buffer, SessionState]()
-    val sessions = HashMap[UTF8Buffer, MqttSession]()
-
-    var loaded = false;
-    def on_load(func: =>Unit) = {
-      if( loaded ) {
-        func
-      } else {
-        if(host.store!=null) {
-          // We load all the persisted session's from the host's store when we are first accessed.
-          queue.suspend()
-          host.store.get_prefixed_map_entries(new AsciiBuffer("mqtt:")) { entries =>
-            queue.resume()
-            queue {
-              for( (_, value) <- entries ) {
-                import collection.JavaConversions._
-                val session_pb = SessionPB.FACTORY.parseUnframed(value)
-                val session_state = new SessionState()
-                session_state.strategy.create(host.store, session_pb.getClientId)
-                if( session_pb.hasReceivedMessageIds ) {
-                  session_state.received_message_ids ++= session_pb.getReceivedMessageIdsList.map(_.toShort)
-                }
-                if( session_pb.hasSubscriptions ) {
-                  session_pb.getSubscriptionsList.foreach { sub =>
-                    val address = SimpleAddress(sub.getAddress.toString)
-                    val topic = new Topic(sub.getName, QoS.values()(sub.getQos))
-                    session_state.subscriptions += sub.getName -> (topic,address)
-                  }
-                }
-                session_states.put(session_pb.getClientId, session_state)
-              }
-              loaded = true
-              func
-            }
-          }
-        } else {
-          loaded = true
-          func
-        }
-      }
-    }
-  }
-
-  def attach(host:VirtualHost, client_id:UTF8Buffer, handler:MqttProtocolHandler) = queue {
-    val host_state = host.plugin_state(new HostState(host), classOf[HostState])
-    host_state.on_load {
-      host_state.sessions.get(client_id) match {
-        case Some(assignment) =>
-          assignment.connect(handler)
-        case None =>
-          val state = if( handler.connect_message.cleanSession() ) {
-            host_state.session_states.remove(client_id).getOrElse(new SessionState())
-          } else {
-            host_state.session_states.getOrElseUpdate(client_id, new SessionState())
-          }
-          val assignment = new MqttSession(host_state, client_id, state)
-          assignment.connect(handler)
-          host_state.sessions.put(client_id, assignment)
-      }
-    }
-  }
-
-  def disconnect(host_state:HostState, client_id:UTF8Buffer, handler:MqttProtocolHandler) = queue {
-    host_state.sessions.get(client_id) match {
-      case Some(assignment) => assignment.disconnect(handler)
-      case None => // Don't expect this to hit.
-    }
-  }
-
-  def remove(host_state:HostState, client_id:UTF8Buffer) = queue {
-    host_state.sessions.remove(client_id)
-  }
-}
 
-///**
-// * An MqttSession can be switch from one connection/protocol handler to another,
-// * but it will only be associated with one at a time. An MqttSession tracks
-// * the state of the communication with a client.
-// *
-// * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-// */
-//case class MqttSession(host_state:HostState, client_id:UTF8Buffer, session_state:SessionState) {
-//
-//  import MqttProtocolHandler._
-//
-//  def host = host_state.host
-//
-//  val queue = createQueue("mqtt: "+client_id)
-//  var manager_disconnected = false
-//
-//  var handler:Option[MqttProtocolHandler] = None
-//  var security_context:SecurityContext = _
-//  var clean_session = false
-//  var connect_message:CONNECT = _
-//  var destination_parser = MqttProtocol.destination_parser
-//
-//  def connect(next:MqttProtocolHandler):Unit = queue {
-//    if(manager_disconnected) {
-//      // we are not the assignment anymore.. go to the session manager
-//      // again to setup a new session.
-//      MqttSessionManager.attach(host, client_id, next)
-//    } else {
-//
-//      // so that we don't switch again until this current switch completes
-//      queue.suspend()
-//      if( handler != None ) {
-//        detach
-//        handler = None
-//      }
-//      queue {
-//        handler=Some(next)
-//        attach
-//      }
-//
-//      // switch the connection to the session queue..
-//      next.connection.set_dispatch_queue(queue) {
-//        queue.resume()
-//      }
-//    }
-//  }
-//
-//  def disconnect(prev:MqttProtocolHandler) = queue {
-//    if( handler==Some(prev) ) {
-//      MqttSessionManager.remove(host_state, client_id)
-//      manager_disconnected = true
-//      detach
-//      handler = None
-//    }
-//  }
-//  /////////////////////////////////////////////////////////////////////
-//  //
-//  // Bits that deal with connections attaching/detaching from the session
-//  //
-//  /////////////////////////////////////////////////////////////////////
-//  def attach = {
-//    queue.assertExecuting()
-//    val h = handler.get
-//    clean_session = h.connect_message.cleanSession()
-//    security_context = h.security_context
-//    h.command_handler = on_transport_command _
-//    destination_parser = h.destination_parser
-//    mqtt_consumer.consumer_sink.downstream = Some(h.sink_manager.open)
-//
-//    def ack_connect = {
-//      queue.assertExecuting()
-//      connect_message = h.connect_message
-//      val connack = new CONNACK
-//      connack.code(CONNECTION_ACCEPTED)
-//      send(connack)
-//    }
-//
-//    if( !clean_session ) {
-//      // Setup the previous subscriptions..
-//      session_state.strategy.create(host.store, client_id)
-//      if( !session_state.subscriptions.isEmpty ) {
-//        h.suspend_read("subscribing")
-//        subscribe(session_state.subscriptions.map(_._2._1)) {
-//          h.resume_read()
-//          h.queue {
-//            ack_connect
-//          }
-//        }
-//      } else {
-//        ack_connect
-//      }
-//    } else {
-//      // do we need to clear the received ids?
-//      // durable_session_state.received_message_ids.clear()
-//      session_state.subscriptions.clear()
-//      if( session_state.durable_sub !=null ) {
-//        var addresses = Array(session_state.durable_sub)
-//        session_state.durable_sub = null
-//        host.dispatch_queue {
-//          host.router.delete(addresses, security_context)
-//        }
-//      }
-//      session_state.strategy.destroy {
-//        ack_connect
-//      }
-//    }
-//
-//  }
-//
-//  def detach:Unit = {
-//    queue.assertExecuting()
-//
-//    if(!producerRoutes.isEmpty) {
-//      import collection.JavaConversions._
-//      val routes = producerRoutes.values.toSeq.toArray
-//      host.dispatch_queue {
-//        routes.foreach { route=>
-//          host.router.disconnect(Array(route.address), route)
-//        }
-//      }
-//      producerRoutes.clear
-//    }
-//
-//    if( clean_session ) {
-//      if(!mqtt_consumer.addresses.isEmpty) {
-//        var addresses = mqtt_consumer.addresses.keySet.toArray
-//        host.dispatch_queue {
-//          host.router.unbind(addresses, mqtt_consumer, false , security_context)
-//        }
-//        mqtt_consumer.addresses.clear()
-//      }
-//      session_state.subscriptions.clear()
-//    } else {
-//      if(session_state.durable_sub!=null) {
-//        var addresses = Array(session_state.durable_sub)
-//        host.dispatch_queue {
-//          host.router.unbind(addresses, mqtt_consumer, false , security_context)
-//        }
-//        mqtt_consumer.addresses.clear()
-//        session_state.durable_sub = null
-//      }
-//    }
-//
-//    in_flight_publishes.values.foreach { request =>
-//      if( request.ack!=null ) {
-//        if(request.delivered) {
-//          request.ack(Delivered)
-//        } else {
-//          request.ack(Undelivered)
-//        }
-//      }
-//    }
-//    in_flight_publishes.clear()
-//
-//    handler.get.sink_manager.close(mqtt_consumer.consumer_sink.downstream.get, (request)=>{})
-//    mqtt_consumer.consumer_sink.downstream = None
-//
-//    handler.get.on_transport_disconnected()
-//  }
-//
-//  def decode_destination(value:UTF8Buffer):SimpleAddress = {
-//    val rc = destination_parser.decode_single_destination(value.toString, (name)=>{
-//      SimpleAddress("topic", destination_parser.decode_path(name))
-//    })
-//    if( rc==null ) {
-//      handler.foreach(_.die("Invalid mqtt destination name: "+value))
-//    }
-//    rc
-//  }
-//
-//  /////////////////////////////////////////////////////////////////////
-//  //
-//  // Bits that deal with assigning message ids to QoS > 0 requests
-//  // and tracking those requests so that they can get replayed on a
-//  // reconnect.
-//  //
-//  /////////////////////////////////////////////////////////////////////
-//
-//  var in_flight_publishes = HashMap[Short, Request]()
-//
-//  def send(message: MessageSupport.Message): Unit = {
-//    queue.assertExecuting()
-//    handler.foreach(_.connection_sink.offer(Request(0, message, null)))
-//  }
-//
-//  def publish_completed(id: Short): Unit = {
-//    queue.assertExecuting()
-//    in_flight_publishes.remove(id) match {
-//      case Some(request) =>
-//        if ( request.ack != null ) {
-//          request.ack(Consumed)
-//        }
-//      case None =>
-//        // It's possible that on a reconnect, we get an ACK
-//        // in for message that was not dispatched yet. store
-//        // a place holder so we ack it upon the dispatch
-//        // attempt.
-//        in_flight_publishes.put(id, Request(id, null, null))
-//    }
-//  }
-//
-//  /////////////////////////////////////////////////////////////////////
-//  //
-//  // Bits that deal with processing new messages from the client.
-//  //
-//  /////////////////////////////////////////////////////////////////////
-//  def on_transport_command(command:AnyRef):Unit = command match {
-//    case command:MQTTFrame=>
-//
-//      command.messageType() match {
-//
-//        case PUBLISH.TYPE =>
-//          on_mqtt_publish(received(new PUBLISH().decode(command)))
-//
-//        // This follows a Publish with QoS EXACTLY_ONCE
-//        case PUBREL.TYPE =>
-//          var ack = received(new PUBREL().decode(command))
-//          // TODO: perhaps persist the processed list.. otherwise
-//          // we can't filter out dups after a broker restart.
-//          session_state.received_message_ids.remove(ack.messageId)
-//          session_state.strategy.update {
-//            send(new PUBCOMP().messageId(ack.messageId))
-//          }
-//
-//        case SUBSCRIBE.TYPE =>
-//          on_mqtt_subscribe(received(new SUBSCRIBE().decode(command)))
-//
-//        case UNSUBSCRIBE.TYPE =>
-//          on_mqtt_unsubscribe(received(new UNSUBSCRIBE().decode(command)))
-//
-//        // AT_LEAST_ONCE ack flow for a client subscription
-//        case PUBACK.TYPE =>
-//          val ack = received(new PUBACK().decode(command))
-//          publish_completed(ack.messageId)
-//
-//        // EXACTLY_ONCE ack flow for a client subscription
-//        case PUBREC.TYPE =>
-//          val ack = received(new PUBREC().decode(command))
-//          send(new PUBREL().messageId(ack.messageId))
-//
-//        case PUBCOMP.TYPE =>
-//          val ack: PUBCOMP = received(new PUBCOMP().decode(command))
-//          publish_completed(ack.messageId)
-//
-//        case PINGREQ.TYPE =>
-//          received(new PINGREQ().decode(command))
-//          send(new PINGRESP())
-//
-//        case DISCONNECT.TYPE =>
-//          received(new DISCONNECT())
-//          MqttSessionManager.disconnect(host_state, client_id, handler.get)
-//
-//        case _ =>
-//          handler.get.die("Invalid MQTT message type: "+command.messageType());
-//      }
-//    case "failure" =>
-//      // Publish the client's will
-//      publish_will {
-//        // then disconnect him.
-//        MqttSessionManager.disconnect(host_state, client_id, handler.get)
-//      }
-//
-//    case _=>
-//      handler.get.die("Internal Server Error: unexpected mqtt command: "+command.getClass);
-//  }
-//
-//  /////////////////////////////////////////////////////////////////////
-//  //
-//  // Bits that deal with processing PUBLISH messages
-//  //
-//  /////////////////////////////////////////////////////////////////////
-//  var producerRoutes = new LRUCache[UTF8Buffer, MqttProducerRoute](10) {
-//    override def onCacheEviction(eldest: Entry[UTF8Buffer, MqttProducerRoute]) = {
-//      host.dispatch_queue {
-//        host.router.disconnect(Array(eldest.getValue.address), eldest.getValue)
-//      }
-//    }
-//  }
-//  case class MqttProducerRoute(address:SimpleAddress, handler:MqttProtocolHandler) extends DeliveryProducerRoute(host.router) {
-//    override def send_buffer_size = handler.codec.getReadBufferSize
-//    override def connection = Some(handler.connection)
-//    override def dispatch_queue = queue
-//
-//    var suspended = false
-//
-//    refiller = ^ {
-//      if( suspended ) {
-//        suspended = false
-//        handler.resume_read
-//      }
-//    }
-//
-//  }
-//
-//  def on_mqtt_publish(publish:PUBLISH):Unit = {
-//
-//    if( (publish.qos eq EXACTLY_ONCE) && session_state.received_message_ids.contains(publish.messageId)) {
-//      val response = new PUBREC
-//      response.messageId(publish.messageId)
-//      send(response)
-//      return
-//    }
-//
-//    handler.get.messages_received += 1
-//
-//    queue.assertExecuting()
-//    producerRoutes.get(publish.topicName()) match {
-//      case null =>
-//        // create the producer route...
-//
-//        val destination = decode_destination(publish.topicName())
-//        val route = MqttProducerRoute(destination, handler.get)
-//
-//        // don't process commands until producer is connected...
-//        route.handler.suspend_read("route publish lookup")
-//        host.dispatch_queue {
-//          host.router.connect(Array(destination), route, security_context)
-//          queue {
-//            // We don't care if we are not allowed to send..
-//            if (!route.handler.connection.stopped) {
-//              route.handler.resume_read
-//              producerRoutes.put(publish.topicName(), route)
-//              send_via_route(route, publish)
-//            }
-//          }
-//        }
-//
-//      case route =>
-//        // we can re-use the existing producer route
-//        send_via_route(route, publish)
-//    }
-//  }
-//
-//  def send_via_route(route:MqttProducerRoute, publish:PUBLISH):Unit = {
-//    queue.assertExecuting()
-//
-//    def at_least_once_ack(r:DeliveryResult, uow:StoreUOW):Unit = queue {
-//      val response = new PUBACK
-//      response.messageId(publish.messageId)
-//      send(response)
-//    }
-//
-//    def exactly_once_ack(r:DeliveryResult, uow:StoreUOW):Unit = queue {
-//      queue.assertExecuting()
-//      // TODO: perhaps persist the processed list..
-//      session_state.received_message_ids.add(publish.messageId)
-//      session_state.strategy.update {
-//        val response = new PUBREC
-//        response.messageId(publish.messageId)
-//        send(response)
-//      }
-//    }
-//
-//    val ack = publish.qos match {
-//      case AT_LEAST_ONCE => at_least_once_ack _
-//      case EXACTLY_ONCE => exactly_once_ack _
-//      case AT_MOST_ONCE => null
-//    }
-//
-//    if( !route.targets.isEmpty ) {
-//      val delivery = new Delivery
-//      delivery.message = RawMessage(publish.payload)
-//      delivery.persistent = publish.qos().ordinal() > 0
-//      delivery.size = publish.payload.length
-//      delivery.ack = ack
-//      if( publish.retain() ) {
-//        if( delivery.size == 0 ) {
-//          delivery.retain = RetainRemove
-//        } else {
-//          delivery.retain = RetainSet
-//        }
-//      }
-//
-//      // routes can always accept at least 1 delivery...
-//      assert( !route.full )
-//      route.offer(delivery)
-//      if( route.full ) {
-//        // but once it gets full.. suspend to flow control the producer.
-//        route.suspended = true
-//        handler.get.suspend_read("blocked sending to: "+route.overflowSessions.mkString(", "))
-//      }
-//
-//    } else {
-//      ack(null, null)
-//    }
-//  }
-//
-//
-//  //
-//  def publish_will(complete_close: =>Unit) = {
-//    if(connect_message!=null) {
-//      if( connect_message.willTopic()==null ) {
-//        complete_close
-//      } else {
-//
-//        val destination = decode_destination(connect_message.willTopic())
-//        val prodcuer = new DeliveryProducerRoute(host.router) {
-//          override def send_buffer_size = 1024*64
-//          override def connection = handler.map(_.connection)
-//          override def dispatch_queue = queue
-//          refiller = NOOP
-//        }
-//
-//        host.dispatch_queue {
-//          host.router.connect(Array(destination), prodcuer, security_context)
-//          queue {
-//            if(prodcuer.targets.isEmpty) {
-//              complete_close
-//            } else {
-//              val delivery = new Delivery
-//              delivery.message = RawMessage(connect_message.willMessage())
-//              delivery.size = connect_message.willMessage().length
-//              delivery.persistent = connect_message.willQos().ordinal() > 0
-//              if( connect_message.willRetain() ) {
-//                if( delivery.size == 0 ) {
-//                  delivery.retain = RetainRemove
-//                } else {
-//                  delivery.retain = RetainSet
-//                }
-//              }
-//
-//              delivery.ack = (x,y) => {
-//                host.dispatch_queue {
-//                  host.router.disconnect(Array(destination), prodcuer)
-//                }
-//                complete_close
-//              }
-//              handler.get.messages_received += 1
-//              prodcuer.offer(delivery)
-//            }
-//          }
-//        }
-//      }
-//    }
-//  }
-//  /////////////////////////////////////////////////////////////////////
-//  //
-//  // Bits that deal with subscriptions
-//  //
-//  /////////////////////////////////////////////////////////////////////
-//
-//  def on_mqtt_subscribe(sub:SUBSCRIBE):Unit = {
-//    subscribe(sub.topics()) {
-//      queue {
-//        session_state.strategy.update {
-//          val suback = new SUBACK
-//          suback.messageId(sub.messageId())
-//          suback.grantedQos(sub.topics().map(_.qos().ordinal().toByte))
-//          send(suback)
-//        }
-//      }
-//    }
-//  }
-//
-//  def subscribe(topics:Traversable[Topic])(on_subscribed: => Unit):Unit = {
-//    var addresses:Array[_ <: BindAddress] = topics.toArray.map { topic =>
-//      var address:BindAddress = decode_destination(topic.name)
-//      session_state.subscriptions += topic.name -> (topic, address)
-//      mqtt_consumer.addresses += address -> topic.qos
-//      if(PathParser.containsWildCards(address.path)) {
-//        mqtt_consumer.wildcards.put( address.path, topic.qos() )
-//      }
-//      address
-//    }
-//
-//    handler.get.subscription_count = mqtt_consumer.addresses.size
-//
-//    addresses = if( clean_session ) {
-//      addresses
-//    } else {
-//      session_state.durable_sub = SubscriptionAddress(Path(client_id.toString), null, mqtt_consumer.addresses.keySet.toArray)
-//      Array(session_state.durable_sub)
-//    }
-//
-//    host.dispatch_queue {
-//      addresses.foreach { address=>
-//        host.router.bind(Array[BindAddress](address), mqtt_consumer, security_context) { result =>
-//        // MQTT ignores subscribe failures.
-//        }
-//      }
-//      on_subscribed
-//    }
-//
-//  }
-//
-//  def on_mqtt_unsubscribe(unsubscribe:UNSUBSCRIBE):Unit = {
-//
-//    val addresses:Array[_ <: BindAddress] = unsubscribe.topics.flatMap { topic =>
-//      session_state.subscriptions.remove(topic).map { case (topic, address)=>
-//        mqtt_consumer.addresses.remove(address)
-//        if(PathParser.containsWildCards(address.path)) {
-//          mqtt_consumer.wildcards.remove(address.path, topic.qos)
-//        }
-//        address
-//      }
-//    }
-//
-//    handler.get.subscription_count = mqtt_consumer.addresses.size
-//
-//    if(!clean_session) {
-//      session_state.durable_sub = SubscriptionAddress(Path(client_id.toString), null, mqtt_consumer.addresses.keySet.toArray)
-//    }
-//
-//    host.dispatch_queue {
-//      if(clean_session) {
-//        host.router.unbind(addresses, mqtt_consumer, false, security_context)
-//      } else {
-//        if( mqtt_consumer.addresses.isEmpty ) {
-//          host.router.unbind(Array(session_state.durable_sub), mqtt_consumer, true, security_context)
-//          session_state.durable_sub = null
-//        } else {
-//          host.router.bind(Array(session_state.durable_sub), mqtt_consumer, security_context) { result =>
-//          }
-//        }
-//      }
-//      queue {
-//        session_state.strategy.update {
-//          val ack = new UNSUBACK
-//          ack.messageId(unsubscribe.messageId())
-//          send(ack)
-//        }
-//      }
-//    }
-//
-//  }
-//
-//  var publish_body = false
-//
-//  lazy val mqtt_consumer = new MqttConsumer
-//  class MqttConsumer extends BaseRetained with DeliveryConsumer {
-//
-//    override def toString = "mqtt client:"+client_id+" remote address: "+security_context.remote_address
-//
-//    val addresses = HashMap[BindAddress, QoS]()
-//    val wildcards = new PathMap[QoS]()
-//
-//    val credit_window_source = createSource(new EventAggregator[(Int, Int), (Int, Int)] {
-//      def mergeEvent(previous:(Int, Int), event:(Int, Int)) = {
-//        if( previous == null ) {
-//          event
-//        } else {
-//          (previous._1+event._1, previous._2+event._2)
-//        }
-//      }
-//      def mergeEvents(previous:(Int, Int), events:(Int, Int)) = mergeEvent(previous, events)
-//    }, dispatch_queue)
-//
-//    credit_window_source.setEventHandler(^{
-//      val data = credit_window_source.getData
-//      credit_window_filter.credit(data._1, data._2)
-//    });
-//    credit_window_source.resume
-//
-//    val consumer_sink = new MutableSink[Request]()
-//    consumer_sink.downstream = None
-//
-//    var next_seq_id = 1L
-//    def get_next_seq_id = {
-//      val rc = next_seq_id
-//      next_seq_id += 1
-//      rc
-//    }
-//
-//    def to_message_id(value:Long):Short = (
-//        0x8000 | // MQTT message ids cannot be zero, so we always set the highest bit.
-//        (value & 0x7FFF) // the lower 15 bits come for the original seq id.
-//      ).toShort
-//
-//    val credit_window_filter = new CreditWindowFilter[(Session[Delivery], Delivery)](consumer_sink.flatMap{ event =>
-//      queue.assertExecuting()
-//      val (session, delivery) = event
-//
-//      session_manager.delivered(session, delivery.size)
-//
-//      // Look up which QoS we need to send this message with..
-//      var topic = delivery.sender.head.simple
-//      import collection.JavaConversions._
-//      addresses.get(topic).orElse(wildcards.get(topic.path).headOption) match {
-//
-//        case None =>
-//          // draining messages after an un-subscribe
-//          acked(delivery, Consumed)
-//          None
-//
-//        case Some(qos) =>
-//
-//          // Convert the Delivery into a Request
-//          var publish = new PUBLISH
-//          publish.topicName(new UTF8Buffer(destination_parser.encode_destination(Array(delivery.sender.head))))
-//          if( delivery.redeliveries > 0) {
-//            publish.dup(true)
-//          }
-//
-//          if( delivery.message.codec eq RawMessageCodec ) {
-//            publish.payload(delivery.message.asInstanceOf[RawMessage].payload)
-//          } else {
-//            if( publish_body ) {
-//              publish.payload(delivery.message.getBodyAs(classOf[Buffer]))
-//            } else {
-//              publish.payload(delivery.message.encoded)
-//            }
-//          }
-//
-//          handler.get.messages_sent += 1
-//
-//          if (delivery.ack!=null && (qos ne AT_MOST_ONCE)) {
-//            publish.qos(qos)
-//            val id = to_message_id(if(clean_session) {
-//              get_next_seq_id // generate our own seq id.
-//            } else {
-//              delivery.seq // use the durable sub's seq id..
-//            })
-//
-//            publish.messageId(id)
-//            val request = Request(id, publish, (result)=>{acked(delivery, result)})
-//            in_flight_publishes.put(id, request) match {
-//              case Some(r) =>
-//                // A reconnecting client could have acked before
-//                // we get dispatched by the durable sub.
-//                if( r.message == null ) {
-//                  in_flight_publishes.remove(id)
-//                  acked(delivery, Consumed)
-//                } else {
-//                  // Looks we sent out a msg with that id.  This could only
-//                  // happen once we send out 0x7FFF message and the first
-//                  // one has not been acked.
-//                  handler.foreach(_.async_die("Client not acking regularly.", null))
-//                }
-//              case None =>
-//            }
-//
-//            Some(request)
-//
-//          } else {
-//            // This callback gets executed once the message
-//            // sent to the transport.
-//            publish.qos(AT_MOST_ONCE)
-//            Some(Request(0, publish, (result)=>{ acked(delivery, result) }))
-//          }
-//      }
-//
-//    }, SessionDeliverySizer)
-//
-//    def acked(delivery:Delivery, result:DeliveryResult) = {
-//      queue.assertExecuting()
-//      credit_window_source.merge((delivery.size, 1))
-//      if( delivery.ack!=null ) {
-//        delivery.ack(result, null)
-//      }
-//    }
-//
-//    credit_window_filter.credit(handler.get.codec.getWriteBufferSize*2, 1)
-//
-//    val session_manager:SessionSinkMux[Delivery] = new SessionSinkMux[Delivery](credit_window_filter, queue, Delivery, Integer.MAX_VALUE/2, receive_buffer_size) {
-//      override def time_stamp = host.broker.now
-//    }
-//
-//    override def dispose() = queue {
-//      super.dispose()
-//    }
-//
-//    def dispatch_queue = queue
-//    override def connection = handler.map(_.connection)
-//    override def receive_buffer_size = 1024*64; // handler.codec.getWriteBufferSize
-//    def is_persistent = false
-//    def matches(delivery:Delivery):Boolean = true
-//
-//    //
-//    // Each destination we subscribe to will establish a session with us.
-//    //
-//    class MqttConsumerSession(val producer:DeliveryProducer) extends DeliverySession with SessionSinkFilter[Delivery] {
-//      producer.dispatch_queue.assertExecuting()
-//      retain
-//
-//      val downstream = session_manager.open(producer.dispatch_queue)
-//
-//      override def toString = "connection to "+handler.map(_.connection.transport.getRemoteAddress).getOrElse("unconnected")
-//
-//      def consumer = mqtt_consumer
-//      var closed = false
-//
-//      def close = {
-//        assert(producer.dispatch_queue.isExecuting)
-//        if( !closed ) {
-//          closed = true
-//          dispose
-//        }
-//      }
-//
-//      def dispose = {
-//        session_manager.close(downstream, (delivery)=>{
-//          // We have been closed so we have to nak any deliveries.
-//          if( delivery.ack!=null ) {
-//            delivery.ack(Undelivered, delivery.uow)
-//          }
-//        })
-//        release
-//      }
-//
-//      // Delegate all the flow control stuff to the session
-//      override def full = {
-//        val rc = super.full
-//        rc
-//      }
-//
-//      def offer(delivery:Delivery) = {
-//        if( full ) {
-//          false
-//        } else {
-//          delivery.message.retain()
-//          val rc = downstream.offer(delivery)
-//          assert(rc, "offer should be accepted since it was not full")
-//          true
-//        }
-//      }
-//
-//    }
-//    def connect(p:DeliveryProducer) = new MqttConsumerSession(p)
-//  }
-//
-//}



Mime
View raw message