activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1023946 - in /activemq/activemq-apollo/trunk: apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/ apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/
Date Mon, 18 Oct 2010 18:53:00 GMT
Author: chirino
Date: Mon Oct 18 18:53:00 2010
New Revision: 1023946

URL: http://svn.apache.org/viewvc?rev=1023946&view=rev
Log:
Disable solinger in tcp connections so they shutdown fast once closed. Stomp protocol handler
now adds a delay when it it sends an error to the client before closing out the connection.

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1023946&r1=1023945&r2=1023946&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Mon Oct 18 18:53:00 2010
@@ -34,6 +34,7 @@ import org.apache.activemq.apollo.transp
 import org.apache.activemq.apollo.store._
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.dto.{BindingDTO, DurableSubscriptionBindingDTO, PointToPointBindingDTO}
+import java.util.concurrent.TimeUnit
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -592,9 +593,11 @@ class StompProtocolHandler extends Proto
     if( !connection.stopped ) {
       connection.transport.suspendRead
       connection.transport.offer(StompFrame(ERROR, headers, BufferContent(ascii(explained)))
)
-      ^ {
+      // TODO: if there are too many open connections we should just close the connection
+      // without waiting for the error to get sent to the client.
+      queue.after(5, TimeUnit.SECONDS) {
         connection.stop()
-      } >>: queue
+      }
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1023946&r1=1023945&r2=1023946&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Mon Oct 18 18:53:00 2010
@@ -17,13 +17,27 @@
 package org.apache.activemq.apollo.stomp
 
 import org.scalatest.matchers.ShouldMatchers
-import org.scalatest.{BeforeAndAfterAll, FunSuite}
 import org.apache.activemq.apollo.util.FunSuiteSupport
 import org.apache.activemq.apollo.broker.{Broker, BrokerFactory}
 
-class StompTest extends FunSuiteSupport with ShouldMatchers {
+class StompTestSupport extends FunSuiteSupport with ShouldMatchers {
   var broker: Broker = null
 
+  override protected def beforeAll() = {
+    val uri = "xml:classpath:activemq-stomp.xml"
+    info("Loading broker configuration from the classpath with URI: " + uri)
+    broker = BrokerFactory.createBroker(uri, true)
+    Thread.sleep(1000); //TODO implement waitUntilStarted
+  }
+
+  override protected def afterAll() = {
+    broker.stop
+  }
+
+
+}
+
+class Stomp10Test extends StompTestSupport {
 
   test("Stomp 1.0 CONNECT") {
     val client = new StompClient
@@ -38,6 +52,9 @@ class StompTest extends FunSuiteSupport 
     frame should include("version:1.0\n")
   }
 
+}
+
+class Stomp11Test extends StompTestSupport {
 
   test("Stomp 1.1 CONNECT") {
     val client = new StompClient
@@ -69,7 +86,7 @@ class StompTest extends FunSuiteSupport 
     frame should include("version:1.1\n")
   }
 
-  test("Stomp 1.1 CONNECT /w Version Fallback") {
+  test("Stomp 1.1 CONNECT /w valid version fallback") {
     val client = new StompClient
     client.open("localhost", 61613)
 
@@ -84,30 +101,33 @@ class StompTest extends FunSuiteSupport 
     frame should include("version:1.0\n")
   }
 
-  test("Stomp CONNECT /w invalid virtual host") {
+  test("Stomp 1.1 CONNECT /w invalid version fallback") {
     val client = new StompClient
     client.open("localhost", 61613)
 
     client.send(
       "CONNECT\n" +
-      "accept-version:1.0,1.1\n" +
-      "host:invalid\n" +
+      "accept-version:9.0,10.0\n" +
+      "host:default\n" +
       "\n")
     val frame = client.receive()
     frame should startWith("ERROR\n")
+    frame should include regex("""version:.+?\n""")
     frame should include regex("""message:.+?\n""")
   }
 
+  test("Stomp CONNECT /w invalid virtual host") {
+    val client = new StompClient
+    client.open("localhost", 61613)
 
-  override protected def beforeAll() = {
-    val uri = "xml:classpath:activemq-stomp.xml"
-    info("Loading broker configuration from the classpath with URI: " + uri)
-    broker = BrokerFactory.createBroker(uri, true)
-    Thread.sleep(1000); //TODO implement waitUntilStarted
-  }
-
-  override protected def afterAll() = {
-    broker.stop
+    client.send(
+      "CONNECT\n" +
+      "accept-version:1.0,1.1\n" +
+      "host:invalid\n" +
+      "\n")
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include regex("""message:.+?\n""")
   }
 
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java?rev=1023946&r1=1023945&r2=1023946&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
Mon Oct 18 18:53:00 2010
@@ -201,6 +201,8 @@ public class TcpTransport extends JavaBa
 
         this.channel.configureBlocking(false);
         this.remoteAddress = channel.socket().getRemoteSocketAddress().toString();
+        channel.socket().setSoLinger(true, 0);
+
         this.socketState = new CONNECTED();
     }
 



Mime
View raw message