activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1032818 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Date Tue, 09 Nov 2010 02:28:55 GMT
Author: chirino
Date: Tue Nov  9 02:28:55 2010
New Revision: 1032818

URL: http://svn.apache.org/viewvc?rev=1032818&view=rev
Log:
The heartbeat monitor seems like a commonly needed thing.. extracting out to the broker module.

Added:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala

Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala?rev=1032818&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala
Tue Nov  9 02:28:55 2010
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.protocol
+
+import org.apache.activemq.apollo.transport.Transport
+import java.util.concurrent.TimeUnit
+
+/**
+ * <p>A HeartBeatMonitor can be used to periodically check the activity
+ * of a transport to see if it is still alive or if a keep alive
+ * packet needs to be transmitted to keep it alive.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class HeartBeatMonitor() {
+
+  var transport:Transport = _
+  var write_interval = 0L
+  var read_interval = 0L
+
+  var on_keep_alive = ()=>{}
+  var on_dead = ()=>{}
+
+  var session = 0
+
+  def schedual_check_writes(session:Int):Unit = {
+    val last_write_counter = transport.getProtocolCodec.getWriteCounter()
+    transport.getDispatchQueue.after(write_interval, TimeUnit.MILLISECONDS) {
+      if( this.session == session ) {
+        if( last_write_counter==transport.getProtocolCodec.getWriteCounter ) {
+          on_keep_alive()
+        }
+        schedual_check_writes(session)
+      }
+    }
+  }
+
+  def schedual_check_reads(session:Int):Unit = {
+    val last_read_counter = transport.getProtocolCodec.getReadCounter()
+    transport.getDispatchQueue.after(read_interval, TimeUnit.MILLISECONDS) {
+      if( this.session == session ) {
+        if( last_read_counter==transport.getProtocolCodec.getReadCounter ) {
+          on_dead()
+        }
+        schedual_check_reads(session)
+      }
+    }
+  }
+
+  def start = {
+    session += 1
+    if( write_interval!=0 ) {
+      schedual_check_writes(session)
+    }
+    if( read_interval!=0 ) {
+      schedual_check_reads(session)
+    }
+  }
+
+  def stop = {
+    session += 1
+  }
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1032818&r1=1032817&r2=1032818&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Tue Nov  9 02:28:55 2010
@@ -23,8 +23,8 @@ import org.fusesource.hawtdispatch._
 
 import AsciiBuffer._
 import org.apache.activemq.apollo.broker._
-import protocol.{ProtocolFactory, Protocol, ProtocolHandler}
 import java.lang.String
+import protocol.{HeartBeatMonitor, ProtocolFactory, Protocol, ProtocolHandler}
 import Stomp._
 import BufferConversions._
 import java.io.IOException
@@ -96,56 +96,6 @@ object StompProtocol extends StompProtoc
 }
 
 
-class HeartBeatMonitor() {
-
-  var transport:Transport = _
-  var write_interval = 0L
-  var read_interval = 0L
-
-  var on_keep_alive = ()=>{}
-  var on_dead = ()=>{}
-
-  var session = 0
-
-  def schedual_check_writes(session:Int):Unit = {
-    val last_write_counter = transport.getProtocolCodec.getWriteCounter()
-    transport.getDispatchQueue.after(write_interval, TimeUnit.MILLISECONDS) {
-      if( this.session == session ) {
-        if( last_write_counter==transport.getProtocolCodec.getWriteCounter ) {
-          on_keep_alive()
-        }
-        schedual_check_writes(session)
-      }
-    }
-  }
-
-  def schedual_check_reads(session:Int):Unit = {
-    val last_read_counter = transport.getProtocolCodec.getReadCounter()
-    transport.getDispatchQueue.after(read_interval, TimeUnit.MILLISECONDS) {
-      if( this.session == session ) {
-        if( last_read_counter==transport.getProtocolCodec.getReadCounter ) {
-          on_dead()
-        }
-        schedual_check_reads(session)
-      }
-    }
-  }
-
-  def start = {
-    session += 1
-    if( write_interval!=0 ) {
-      schedual_check_writes(session)
-    }
-    if( read_interval!=0 ) {
-      schedual_check_reads(session)
-    }
-  }
-
-  def stop = {
-    session += 1
-  }
-}
-
 object StompProtocolHandler extends Log {
 
   // How long we hold a failed connection open so that the remote end



Mime
View raw message