activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1023987 - /activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
Date Mon, 18 Oct 2010 20:25:28 GMT
Author: chirino
Date: Mon Oct 18 20:25:28 2010
New Revision: 1023987

URL: http://svn.apache.org/viewvc?rev=1023987&view=rev
Log:
Sync up StompRemoteClients.scala with Stomp constant changes

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala?rev=1023987&r1=1023986&r2=1023987&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
Mon Oct 18 20:25:28 2010
@@ -32,7 +32,6 @@ import _root_.org.fusesource.hawtdispatc
 
 class StompRemoteConsumer extends RemoteConsumer with Logging {
   var outboundSink: OverflowSink[StompFrame] = null
-  var messageCount = 0
 
   def watchdog(lastMessageCount: Int) : Unit = {
     val seconds = 10
@@ -56,18 +55,18 @@ class StompRemoteConsumer extends Remote
       ascii("/topic/" + destination.getName().toString());
     }
 
-    var frame = StompFrame(Stomp.Commands.CONNECT);
+    var frame = StompFrame(CONNECT);
     outboundSink.offer(frame);
 
     var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
-    headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination)
-    headers ::= (Stomp.Headers.Subscribe.ID, ascii("stomp-sub-" + name))
+    headers ::= (DESTINATION, stompDestination)
+    headers ::= (ID, ascii("stomp-sub-" + name))
 
     if( persistent ) {
-      headers ::= (Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.CLIENT)
+      headers ::= (ACK_MODE, CLIENT)
     }
 
-    frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
+    frame = StompFrame(SUBSCRIBE, headers);
     outboundSink.offer(frame);
     watchdog(messageCount)
   }
@@ -75,17 +74,17 @@ class StompRemoteConsumer extends Remote
   override def onTransportCommand(command: Object) = {
     var frame = command.asInstanceOf[StompFrame]
     frame match {
-      case StompFrame(Responses.CONNECTED, headers, _, _) =>
-      case StompFrame(Responses.MESSAGE, headers, content, _) =>
+      case StompFrame(CONNECTED, headers, _, _) =>
+      case StompFrame(MESSAGE, headers, content, _) =>
           messageReceived();
 
           // we client ack if persistent messages are being used.
           if( persistent ) {
-            var rc = List((Stomp.Headers.Ack.MESSAGE_ID, frame.header(Stomp.Headers.Message.MESSAGE_ID)))
-            outboundSink.offer(StompFrame(Stomp.Commands.ACK, rc));
+            var rc = List((MESSAGE_ID, frame.header(MESSAGE_ID)))
+            outboundSink.offer(StompFrame(ACK, rc));
           }
 
-      case StompFrame(Responses.ERROR, headers, content, _) =>
+      case StompFrame(ERROR, headers, content, _) =>
         onFailure(new Exception("Server reported an error: " + frame.content));
       case _ =>
         onFailure(new Exception("Unexpected stomp command: " + frame.action));
@@ -96,20 +95,17 @@ class StompRemoteConsumer extends Remote
       if (thinkTime > 0) {
         transport.suspendRead
         dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
-          messageCount += 1
           rate.increment();
           if (!stopped) {
             transport.resumeRead
           }
         })
       } else {
-        messageCount += 1
         rate.increment
       }
   }
 }
 
-
 class StompRemoteProducer extends RemoteProducer with Logging {
   var outboundSink: OverflowSink[StompFrame] = null
   var stompDestination: AsciiBuffer = null
@@ -117,12 +113,12 @@ class StompRemoteProducer extends Remote
 
   def send_next: Unit = {
       var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
-      headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
+      headers ::= (DESTINATION, stompDestination);
       if (property != null) {
         headers ::= (ascii(property), ascii(property));
       }
       if( persistent ) {
-        headers ::= ((Stomp.Headers.RECEIPT_REQUESTED, ascii("x")));
+        headers ::= ((RECEIPT_REQUESTED, ascii("x")));
       }
       //    var p = this.priority;
       //    if (priorityMod > 0) {
@@ -130,7 +126,7 @@ class StompRemoteProducer extends Remote
       //    }
 
       var content = ascii(createPayload());
-      frame = StompFrame(Stomp.Commands.SEND, headers, BufferContent(content))
+      frame = StompFrame(SEND, headers, BufferContent(content))
       drain()
   }
 
@@ -168,20 +164,20 @@ class StompRemoteProducer extends Remote
     } else {
       stompDestination = ascii("/topic/" + destination.getName().toString());
     }
-    outboundSink.offer(StompFrame(Stomp.Commands.CONNECT));
+    outboundSink.offer(StompFrame(CONNECT));
     send_next
   }
 
   override def onTransportCommand(command: Object) = {
     var frame = command.asInstanceOf[StompFrame]
     frame match {
-      case StompFrame(Responses.RECEIPT, headers, _, _) =>
+      case StompFrame(RECEIPT, headers, _, _) =>
         assert( persistent )
         // we got the ack for the previous message we sent.. now send the next one.
         send_next
 
-      case StompFrame(Responses.CONNECTED, headers, _, _) =>
-      case StompFrame(Responses.ERROR, headers, content, _) =>
+      case StompFrame(CONNECTED, headers, _, _) =>
+      case StompFrame(ERROR, headers, content, _) =>
         onFailure(new Exception("Server reported an error: " + frame.content.utf8));
       case _ =>
         onFailure(new Exception("Unexpected stomp command: " + frame.action));
@@ -189,3 +185,30 @@ class StompRemoteProducer extends Remote
   }
 }
 
+trait Watchog extends RemoteConsumer {
+  var messageCount = 0
+
+  def watchdog(lastMessageCount: Int): Unit = {
+    val seconds = 10
+    dispatchQueue.dispatchAfter(seconds, TimeUnit.SECONDS, ^ {
+      if (messageCount == lastMessageCount) {
+        warn("Messages have stopped arriving after " + seconds + "s, stopping consumer")
+        stop
+      } else {
+        watchdog(messageCount)
+      }
+    })
+  }
+
+  abstract override protected def messageReceived() = {
+    super.messageReceived
+    messageCount += 1
+  }
+
+  abstract override protected def onConnected() = {
+    super.onConnected
+    watchdog(messageCount)
+  }
+
+}
+



Mime
View raw message