activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1390107 - in /activemq/activemq-apollo/trunk/apollo-mqtt/src: main/scala/org/apache/activemq/apollo/mqtt/ test/resources/ test/scala/org/apache/activemq/apollo/mqtt/test/
Date Tue, 25 Sep 2012 20:04:46 GMT
Author: chirino
Date: Tue Sep 25 20:04:46 2012
New Revision: 1390107

URL: http://svn.apache.org/viewvc?rev=1390107&view=rev
Log:
Adding tests and fixing bugs in the MQTT protocol impl.

Added:
    activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/apollo-mqtt-leveldb.xml
    activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/apollo-mqtt.xml
    activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/log4j.properties
    activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttTestSupport.scala

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala?rev=1390107&r1=1390106&r2=1390107&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
Tue Sep 25 20:04:46 2012
@@ -1142,6 +1142,8 @@ case class MqttSession(host_state:HostSt
     val credit_window_filter = new CreditWindowFilter[(Session[Delivery], Delivery)](consumer_sink.flatMap{
event =>
       queue.assertExecuting()
       val (session, delivery) = event
+
+      session_manager.delivered(session, delivery.size)
       
       // Look up which QoS we need to send this message with..
       var topic = delivery.sender.head.simple
@@ -1222,7 +1224,7 @@ case class MqttSession(host_state:HostSt
     
     credit_window_filter.credit(handler.get.codec.getWriteBufferSize*2, 1)
 
-    val session_manager = new SessionSinkMux[Delivery](credit_window_filter, queue, Delivery,
Integer.MAX_VALUE/2, receive_buffer_size) {
+    val session_manager:SessionSinkMux[Delivery] = new SessionSinkMux[Delivery](credit_window_filter,
queue, Delivery, Integer.MAX_VALUE/2, receive_buffer_size) {
       override def time_stamp = host.broker.now
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/apollo-mqtt-leveldb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/apollo-mqtt-leveldb.xml?rev=1390107&r1=1390106&r2=1390107&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/apollo-mqtt-leveldb.xml
(original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/apollo-mqtt-leveldb.xml
Tue Sep 25 20:04:46 2012
@@ -22,6 +22,9 @@
     <host_name>localhost</host_name>
 
     <queue name="unified.**" unified="true"/>
+    <topic id="queued.**" slow_consumer_policy="queue">
+      <subscription tail_buffer="4k"/>
+    </topic>
 
     <leveldb_store directory="${testdatadir}"/>
   </virtual_host>

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/apollo-mqtt.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/apollo-mqtt.xml?rev=1390107&r1=1390106&r2=1390107&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/apollo-mqtt.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/apollo-mqtt.xml Tue Sep
25 20:04:46 2012
@@ -23,6 +23,9 @@
     <host_name>127.0.0.1</host_name>
 
     <queue name="unified.**" unified="true"/>
+    <topic id="queued.**" slow_consumer_policy="queue">
+      <subscription tail_buffer="4k"/>
+    </topic>
 
   </virtual_host>
 

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/log4j.properties?rev=1390107&r1=1390106&r2=1390107&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/log4j.properties (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/log4j.properties Tue Sep
25 20:04:46 2012
@@ -23,7 +23,7 @@ log4j.logger.org.apache.activemq.apollo=
 #
 # Uncomment one of the following to enable debug logging
 #
-log4j.logger.org.apache.activemq.apollo=TRACE
+# log4j.logger.org.apache.activemq.apollo=TRACE
 # log4j.logger.org.apache.activemq.apollo.broker=DEBUG
 # log4j.logger.org.apache.activemq.apollo.web=DEBUG
 # log4j.logger.org.apache.activemq.apollo.cli=DEBUG

Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala?rev=1390107&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala
Tue Sep 25 20:04:46 2012
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.mqtt.test
+
+import org.fusesource.mqtt.client._
+import org.apache.activemq.apollo.broker.Broker
+import org.fusesource.hawtdispatch._
+import java.util.concurrent.{TimeUnit, CountDownLatch}
+
+class MqttLoadTest extends MqttTestSupport {
+
+  for( prefix <- List("normal", "queued") ) {
+
+    test("Load on: "+prefix) {
+
+      val topic = prefix+"/load"
+
+      val receiver = create_client
+      connect(receiver)
+      subscribe(topic, QoS.AT_LEAST_ONCE, receiver)
+
+      val done = new CountDownLatch(1)
+      Broker.BLOCKABLE_THREAD_POOL {
+        for(i <- 1 to 1000) {
+          should_receive("%0256d".format(i), topic, receiver)
+        }
+        done.countDown()
+      }
+
+      connect()
+      for(i <- 1 to 1000) {
+        publish(topic, "%0256d".format(i), QoS.AT_LEAST_ONCE, false)
+      }
+
+      done.await(30, TimeUnit.SECONDS) should be(true)
+    }
+
+  }
+}
+
+// This test is failing with: java.lang.AssertionError: assertion failed: locator_based.unary_$bang.$bar$bar(uow.have_locators)
+//class MqttLoadLevelDBTest extends MqttLoadTest {
+//  override def broker_config_uri = "xml:classpath:apollo-mqtt-leveldb.xml"
+//}

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttTestSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttTestSupport.scala?rev=1390107&r1=1390106&r2=1390107&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttTestSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttTestSupport.scala
Tue Sep 25 20:04:46 2012
@@ -64,7 +64,9 @@ class MqttTestSupport extends BrokerFunS
     }
 
     def disconnect() = {
-      connection.disconnect()
+      if( connection!=null ) {
+        connection.disconnect()
+      }
     }
   }
 



Mime
View raw message