activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1023948 - in /activemq/activemq-apollo/trunk/apollo-stomp/src: main/scala/org/apache/activemq/apollo/stomp/ test/scala/org/apache/activemq/apollo/stomp/
Date Mon, 18 Oct 2010 18:53:17 GMT
Author: chirino
Date: Mon Oct 18 18:53:17 2010
New Revision: 1023948

URL: http://svn.apache.org/viewvc?rev=1023948&view=rev
Log:
Added the Stomp 1.1 heart-beat feature

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1023948&r1=1023947&r2=1023948&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
Mon Oct 18 18:53:17 2010
@@ -203,7 +203,12 @@ class StompCodec extends ProtocolCodec w
       ProtocolCodec.BufferState.FULL
     } else {
       val was_empty = is_empty
-      encode(command.asInstanceOf[StompFrame], next_write_buffer);
+      command match {
+        case buffer:Buffer=>
+          buffer.writeTo(next_write_buffer.asInstanceOf[DataOutput])        
+        case frame:StompFrame=>
+          encode(frame, next_write_buffer);
+      }
       if( was_empty ) {
         ProtocolCodec.BufferState.WAS_EMPTY
       } else {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1023948&r1=1023947&r2=1023948&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Mon Oct 18 18:53:17 2010
@@ -398,6 +398,7 @@ object Stomp {
   val REQUEST_ID = ascii("request-id")
   val ACCEPT_VERSION = ascii("accept-version")
   val HOST = ascii("host")
+  val HEART_BEAT = ascii("heart-beat")
 
   val MESSAGE_HEADER = ascii("message")
   val VERSION = ascii("version")
@@ -414,6 +415,7 @@ object Stomp {
   val INDIVIDUAL = ascii("client-individual")
   val V1_0 = ascii("1.0")
   val V1_1 = ascii("1.1")
+  val DEFAULT_HEAT_BEAT = ascii("0,0")
 
   val SUPPORTED_PROTOCOL_VERSIONS = Set(V1_0,V1_1)
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1023948&r1=1023947&r2=1023948&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Mon Oct 18 18:53:17 2010
@@ -93,17 +93,86 @@ object StompProtocol extends StompProtoc
 
 }
 
-object StompProtocolHandler extends Log
+
+class HeartBeatMonitor() {
+
+  var transport:Transport = _
+  var write_interval = 0L
+  var read_interval = 0L
+
+  var on_keep_alive = ()=>{}
+  var on_dead = ()=>{}
+
+  var session = 0
+
+  def schedual_check_writes(session:Int):Unit = {
+    val last_write_counter = transport.getProtocolCodec.getWriteCounter()
+    transport.getDispatchQueue.after(write_interval, TimeUnit.MILLISECONDS) {
+      if( this.session == session ) {
+        if( last_write_counter==transport.getProtocolCodec.getWriteCounter ) {
+          on_keep_alive()
+        }
+        schedual_check_writes(session)
+      }
+    }
+  }
+
+  def schedual_check_reads(session:Int):Unit = {
+    val last_read_counter = transport.getProtocolCodec.getReadCounter()
+    transport.getDispatchQueue.after(read_interval, TimeUnit.MILLISECONDS) {
+      if( this.session == session ) {
+        if( last_read_counter==transport.getProtocolCodec.getReadCounter ) {
+          on_dead()
+        }
+        schedual_check_reads(session)
+      }
+    }
+  }
+
+  def start = {
+    session += 1
+    if( write_interval!=0 ) {
+      schedual_check_writes(session)
+    }
+    if( read_interval!=0 ) {
+      schedual_check_reads(session)
+    }
+  }
+
+  def stop = {
+    session += 1
+  }
+}
+
+object StompProtocolHandler extends Log {
+
+  // How long we hold a failed connection open so that the remote end
+  // can get the resulting error message.
+  val DEFAULT_DIE_DELAY = 5*1000L
+  var die_delay = DEFAULT_DIE_DELAY
+
+    // How often we can send heartbeats of the connection is idle.
+  val DEFAULT_OUTBOUND_HEARTBEAT = 100L
+  var outbound_heartbeat = DEFAULT_OUTBOUND_HEARTBEAT
+
+  // How often we want to get heartbeats from the peer if the connection is idle.
+  val DEFAULT_INBOUND_HEARTBEAT = 10*1000L
+  var inbound_heartbeat = DEFAULT_INBOUND_HEARTBEAT
+
+}
+
+import StompProtocolHandler._
+
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class StompProtocolHandler extends ProtocolHandler with DispatchLogging {
-  
+
   def protocol = "stomp"
 
   override protected def log = StompProtocolHandler
-  
+
   protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
 
   class StompConsumer(val subscription_id:Option[AsciiBuffer], val destination:Destination,
val ackMode:AsciiBuffer, val selector:(AsciiBuffer, BooleanExpression), val binding:BindingDTO)
extends BaseRetained with DeliveryConsumer {
@@ -115,7 +184,7 @@ class StompProtocolHandler extends Proto
       dispatchQueue.release
     })
 
-    override def connection = Some(StompProtocolHandler.this.connection) 
+    override def connection = Some(StompProtocolHandler.this.connection)
 
     def matches(delivery:Delivery) = {
       if( delivery.message.protocol eq StompProtocol ) {
@@ -170,7 +239,7 @@ class StompProtocolHandler extends Proto
           true
         }
       }
-      
+
       def refiller = session.refiller
       def refiller_=(value:Runnable) = { session.refiller=value }
 
@@ -189,16 +258,24 @@ class StompProtocolHandler extends Proto
   private def queue = connection.dispatchQueue
   var pendingAcks = HashMap[AsciiBuffer, (StoreUOW)=>Unit]()
 
+
+  var session_id:Option[AsciiBuffer] = None
+  var protocol_version:Option[AsciiBuffer] = None
+
+  var heart_beat_monitor:HeartBeatMonitor = new HeartBeatMonitor
+
   override def onTransportConnected() = {
 
     session_manager = new SinkMux[StompFrame]( MapSink(connection.transportSink){x=>x},
dispatchQueue, StompFrame)
     connection_sink = new OverflowSink(session_manager.open(dispatchQueue));
     connection_sink.refiller = ^{}
     connection.transport.resumeRead
+
   }
 
   override def onTransportDisconnected() = {
     if( !closed ) {
+      heart_beat_monitor.stop
       closed=true;
       producerRoutes.foreach{
         case(_,route)=> host.router.disconnect(route)
@@ -223,56 +300,102 @@ class StompProtocolHandler extends Proto
   override def onTransportCommand(command:Any) = {
     try {
       command match {
-        case StompFrame(SEND, _, _, _) =>
-          on_stomp_send(command.asInstanceOf[StompFrame])
-        case StompFrame(ACK, headers, content, _) =>
-          on_stomp_ack(command.asInstanceOf[StompFrame])
-
-        case StompFrame(BEGIN, headers, content, _) =>
-          on_stomp_begin(headers)
-        case StompFrame(COMMIT, headers, content, _) =>
-          on_stomp_commit(headers)
-        case StompFrame(ABORT, headers, content, _) =>
-          on_stomp_abort(headers)
-
-        case StompFrame(SUBSCRIBE, headers, content, _) =>
-          info("got command: %s", command)
-          on_stomp_subscribe(headers)
-
-        case StompFrame(STOMP, headers, _, _) =>
-          info("got command: %s", command)
-          on_stomp_connect(headers)
-        case StompFrame(CONNECT, headers, _, _) =>
-          info("got command: %s", command)
-          on_stomp_connect(headers)
-
-        case StompFrame(DISCONNECT, headers, content, _t) =>
-          info("got command: %s", command)
-          connection.stop
         case s:StompCodec =>
           // this is passed on to us by the protocol discriminator
           // so we know which wire format is being used.
-        case StompFrame(unknown, _, _, _) =>
-          die("Unsupported STOMP command: "+unknown);
-        case _ =>
-          die("Unsupported command: "+command);
+        case frame:StompFrame=>
+
+          info("got command: %s", frame)
+          if( protocol_version eq None ) {
+            frame.action match {
+              case STOMP =>
+                on_stomp_connect(frame.headers)
+              case CONNECT =>
+                on_stomp_connect(frame.headers)
+              case DISCONNECT =>
+                connection.stop
+              case _ =>
+                die("Client must first send a connect frame");
+            }
+
+          } else {
+            frame.action match {
+              case SEND =>
+                on_stomp_send(frame)
+              case ACK =>
+                on_stomp_ack(frame)
+
+              case BEGIN =>
+                on_stomp_begin(frame.headers)
+              case COMMIT =>
+                on_stomp_commit(frame.headers)
+              case ABORT =>
+                on_stomp_abort(frame.headers)
+              case SUBSCRIBE =>
+                on_stomp_subscribe(frame.headers)
+
+              case DISCONNECT =>
+                connection.stop
+
+              case _ =>
+                die("Invalid frame: "+frame.action);
+            }
+          }
+
+        case _=>
+          warn("Internal Server Error: unexpected command type")
+          die("Internal Server Error");
       }
     }  catch {
       case e:Exception =>
-        die("Unexpected Error", e.toString);
+        warn(e, "Internal Server Error")
+        die("Internal Server Error");
     }
   }
 
+  def on_stomp_connect(headers:HeaderMap):Unit = {
 
-  var session_id:Option[AsciiBuffer] = None
-  var protocol_version:Option[AsciiBuffer] = None
-
-  def on_stomp_connect(headers:HeaderMap) = {
 
     protocol_version = get(headers, ACCEPT_VERSION).getOrElse(V1_0).split(COMMA).map(_.ascii).reverse.find{v=>
       SUPPORTED_PROTOCOL_VERSIONS.contains(v)
     }
 
+    val heart_beat = get(headers, HEART_BEAT).getOrElse(DEFAULT_HEAT_BEAT)
+    heart_beat.split(COMMA).map(_.ascii) match {
+      case Array(cx,cy) =>
+        try {
+          val can_send = cx.toString.toLong
+          val please_send = cy.toString.toLong
+
+          if( inbound_heartbeat>=0 && can_send > 0 ) {
+            heart_beat_monitor.read_interval = inbound_heartbeat.max(can_send)
+
+            // lets be a little forgiving to account to packet transmission latency.
+            heart_beat_monitor.read_interval += heart_beat_monitor.read_interval.min(5000)
+
+            heart_beat_monitor.on_dead = () => {
+              die("Stale connection.  Missed heartbeat.")
+            }
+          }
+          if( outbound_heartbeat>=0 && please_send > 0 ) {
+            heart_beat_monitor.write_interval = outbound_heartbeat.max(please_send)
+            heart_beat_monitor.on_keep_alive = () => {
+              connection.transport.offer(NEWLINE_BUFFER)
+            }
+          }
+
+          heart_beat_monitor.transport = connection.transport
+          heart_beat_monitor.start
+
+        } catch {
+          case x:NumberFormatException=>
+            die("Invalid heart-beat header: "+heart_beat)
+            return
+        }
+      case _ =>
+        die("Invalid heart-beat header: "+heart_beat)
+        return
+    }
 
     protocol_version match {
       case None =>
@@ -281,21 +404,25 @@ class StompProtocolHandler extends Proto
         _die((MESSAGE_HEADER, ascii("version not supported"))::
             (VERSION, ascii(supported_versions))::Nil,
             "Supported protocol versions are %s".format(supported_versions))
+        return
 
       case Some(x) =>
         connection.transport.suspendRead
 
         val host_header = get(headers, HOST)
-        val cb: (VirtualHost)=>Unit = queue.wrap { (host)=>
-
+        val cb: (VirtualHost)=>Unit = (host)=>
+          queue {
             if(host!=null) {
               this.host=host
 
+              val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
               session_id = Some(ascii(this.host.config.id + ":"+this.host.session_counter.incrementAndGet))
+
               connection_sink.offer(
                 StompFrame(CONNECTED, List(
                   (VERSION, protocol_version.get),
-                  (SESSION, session_id.get)
+                  (SESSION, session_id.get),
+                  (HEART_BEAT, outbound_heart_beat_header)
                 )))
 
               if( this.host.direct_buffer_pool!=null ) {
@@ -595,7 +722,7 @@ class StompProtocolHandler extends Proto
       connection.transport.offer(StompFrame(ERROR, headers, BufferContent(ascii(explained)))
)
       // TODO: if there are too many open connections we should just close the connection
       // without waiting for the error to get sent to the client.
-      queue.after(5, TimeUnit.SECONDS) {
+      queue.after(die_delay, TimeUnit.MILLISECONDS) {
         connection.stop()
       }
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala?rev=1023948&r1=1023947&r2=1023948&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
Mon Oct 18 18:53:17 2010
@@ -71,13 +71,17 @@ import java.io._
     }
 
     def receive():String = {
+      var start = true;
       val buffer = new BAOS()
       var c = in.read
       while( c >= 0 ) {
         if( c==0 ) {
           return new String(buffer.toByteArray, "UTF-8")
         }
-        buffer.write(c)
+        if( !start || c!= Stomp.NEWLINE) {
+          start = false
+          buffer.write(c)
+        }
         c = in.read()
       }
       throw new EOFException()

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1023948&r1=1023947&r2=1023948&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Mon Oct 18 18:53:17 2010
@@ -37,7 +37,7 @@ class StompTestSupport extends FunSuiteS
 
 }
 
-class Stomp10Test extends StompTestSupport {
+class Stomp10ConnectTest extends StompTestSupport {
 
   test("Stomp 1.0 CONNECT") {
     val client = new StompClient
@@ -54,7 +54,7 @@ class Stomp10Test extends StompTestSuppo
 
 }
 
-class Stomp11Test extends StompTestSupport {
+class Stomp11ConnectTest extends StompTestSupport {
 
   test("Stomp 1.1 CONNECT") {
     val client = new StompClient
@@ -130,4 +130,66 @@ class Stomp11Test extends StompTestSuppo
     frame should include regex("""message:.+?\n""")
   }
 
+}
+
+class Stomp11HeartBeatTest extends StompTestSupport {
+
+  test("Stomp 1.1 Broker sends heart-beat") {
+    val client = new StompClient
+    client.open("localhost", 61613)
+
+    client.send(
+      "CONNECT\n" +
+      "accept-version:1.1\n" +
+      "host:default\n" +
+      "heart-beat:0,1000\n" +
+      "\n")
+    val frame = client.receive()
+    frame should startWith("CONNECTED\n")
+    frame should include regex("""heart-beat:.+?\n""")
+
+    def heart_beat_after(time:Long) {
+      var start = System.currentTimeMillis
+      val c = client.in.read()
+      c should be === (Stomp.NEWLINE)
+      var end = System.currentTimeMillis
+      (end - start) should be >= time
+    }
+    client.in.read()
+    heart_beat_after(900)
+    heart_beat_after(900)
+  }
+
+
+  test("Stomp 1.1 Broker times out idle connection") {
+    StompProtocolHandler.inbound_heartbeat = 1000L
+    try {
+      val client = new StompClient
+      client.open("localhost", 61613)
+
+      client.send(
+        "CONNECT\n" +
+        "accept-version:1.1\n" +
+        "host:default\n" +
+        "heart-beat:1000,0\n" +
+        "\n")
+
+      var frame = client.receive()
+      frame should startWith("CONNECTED\n")
+      frame should include regex("""heart-beat:.+?\n""")
+
+      var start = System.currentTimeMillis
+
+      frame = client.receive()
+      frame should startWith("ERROR\n")
+      frame should include regex("""message:.+?\n""")
+      
+      var end = System.currentTimeMillis
+      (end - start) should be >= 1000L
+
+    } finally {
+      StompProtocolHandler.inbound_heartbeat = StompProtocolHandler.DEFAULT_INBOUND_HEARTBEAT
+    }
+  }
+
 }
\ No newline at end of file



Mime
View raw message