activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1330823 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Date Thu, 26 Apr 2012 12:53:34 GMT
Author: chirino
Date: Thu Apr 26 12:53:33 2012
New Revision: 1330823

URL: http://svn.apache.org/viewvc?rev=1330823&view=rev
Log:
Fixes APLO-196: Allow protocol filters to drop messages.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala?rev=1330823&r1=1330822&r2=1330823&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
Thu Apr 26 12:53:33 2012
@@ -90,10 +90,16 @@ trait ProtocolHandler {
 
 }
 
-object ProtocolFilter {
+@deprecated(message="Please use the ProtocolFilter2 interface instead", since="1.3")
+trait ProtocolFilter {
+  def filter[T](command: T):T
+}
+
+object ProtocolFilter2 {
+
   def create_filters(clazzes:List[String], handler:ProtocolHandler) = {
     clazzes.map { clazz =>
-      val filter = Broker.class_loader.loadClass(clazz).newInstance().asInstanceOf[ProtocolFilter]
+      val filter = ProtocolFilter2(Broker.class_loader.loadClass(clazz).newInstance().asInstanceOf[AnyRef])
 
       type ProtocolHandlerAware = { var protocol_handler:ProtocolHandler }
       try {
@@ -103,8 +109,39 @@ object ProtocolFilter {
       filter
     }
   }
+
+  /**
+   * Allows you to convert any ProtocolFilter object into a ProtocolFilter2 object.
+   */
+  def apply(filter:AnyRef):ProtocolFilter2 = {
+    filter match {
+      case self:ProtocolFilter2 => self
+      case self:ProtocolFilter => new ProtocolFilter2() {
+        override def filter_inbound[T](command: T): Option[T] = Some(self.filter(command))
+        override def filter_outbound[T](command: T): Option[T] = Some(command)
+      } 
+      case null => null
+      case _ => throw new IllegalArgumentException("Invalid protocol filter type: "+filter.getClass)
+    }
+  }
 }
 
-trait ProtocolFilter {
-  def filter[T](command: T):T
+/**
+ * A Protocol filter can filter frames being sent/received to and from a client.  It can
modify
+ * the frame or even drop it.
+ */
+abstract class ProtocolFilter2 {
+
+  /**
+   * Filters a command frame received from a client.
+   * returns None if the filter wants to drop the frame.
+   */
+  def filter_inbound[T](frame: T):Option[T]
+
+  /**
+   * Filters a command frame being sent client.
+   * returns None if the filter wants to drop the frame.
+   */
+  def filter_outbound[T](frame: T):Option[T]
 }
+

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1330823&r1=1330822&r2=1330823&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Thu Apr 26 12:53:33 2012
@@ -23,7 +23,7 @@ import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.broker._
 import Buffer._
 import java.lang.String
-import protocol.{ProtocolFilter, ProtocolHandler}
+import protocol.{ProtocolFilter2, ProtocolHandler}
 import security.SecurityContext
 import Stomp._
 import org.apache.activemq.apollo.selector.SelectorParser
@@ -579,7 +579,7 @@ class StompProtocolHandler extends Proto
   var waiting_on = WAITING_ON_CLIENT_REQUEST
   var config:StompDTO = _
 
-  var protocol_filters = List[ProtocolFilter]()
+  var protocol_filters = List[ProtocolFilter2]()
 
   var destination_parser = Stomp.destination_parser
   var protocol_convert = "full"
@@ -623,7 +623,7 @@ class StompProtocolHandler extends Proto
     val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
     config = connector_config.protocols.find( _.isInstanceOf[StompDTO]).map(_.asInstanceOf[StompDTO]).getOrElse(new
StompDTO)
 
-    protocol_filters = ProtocolFilter.create_filters(config.protocol_filters.toList, this)
+    protocol_filters = ProtocolFilter2.create_filters(config.protocol_filters.toList, this)
 
     import OptionSupport._
     Option(config.max_data_length).map(MemoryPropertyEditor.parse(_).toInt).foreach( codec.max_data_length
= _ )
@@ -717,10 +717,23 @@ class StompProtocolHandler extends Proto
 
   override def on_transport_connected() = {
     connection_log = connection.connector.broker.connection_log
-    sink_manager = new SinkMux[StompFrame]( connection.transport_sink.map {x=>
+
+    var filtering_sink:Sink[StompFrame] = connection.transport_sink.map { x=>
       trace("sending frame: %s", x)
       x
-    })
+    }
+
+    if(!protocol_filters.isEmpty) {
+      filtering_sink = filtering_sink.flatMap {x=>
+        var cur = Option(x)
+        protocol_filters.foreach { filter =>
+          cur = cur.flatMap(filter.filter_outbound(_))
+        }
+        cur
+      }
+    }
+
+    sink_manager = new SinkMux[StompFrame](filtering_sink)
     connection_sink = new OverflowSink(sink_manager.open());
     resume_read
   }
@@ -769,9 +782,17 @@ class StompProtocolHandler extends Proto
 
           trace("received frame: %s", f)
 
-          var frame = f
-          protocol_filters.foreach { filter =>
-            frame = filter.filter(frame)
+          val frame = if(!protocol_filters.isEmpty) {
+            var cur = Option(f)
+            protocol_filters.foreach { filter =>
+              cur = cur.flatMap(filter.filter_inbound(_))
+            }
+            cur match {
+              case Some(f) => f
+              case None => return // dropping the frame.
+            } 
+          } else {
+            f
           }
 
           if( protocol_version == null ) {



Mime
View raw message