activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1185210 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java
Date Mon, 17 Oct 2011 14:33:41 GMT
Author: tabish
Date: Mon Oct 17 14:33:41 2011
New Revision: 1185210

URL: http://svn.apache.org/viewvc?rev=1185210&view=rev
Log:
Push enough data through the socket so that the socket close gets detected.  

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java?rev=1185210&r1=1185209&r2=1185210&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java
Mon Oct 17 14:33:41 2011
@@ -42,10 +42,10 @@ import org.slf4j.LoggerFactory;
 
 public class SoWriteTimeoutTest extends JmsTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(SoWriteTimeoutTest.class);
-    
+
     final int receiveBufferSize = 16*1024;
     public String brokerTransportScheme = "nio";
-    
+
     protected BrokerService createBroker() throws Exception {
         BrokerService broker = super.createBroker();
         broker.addConnector(brokerTransportScheme + "://localhost:0?transport.soWriteTimeout=1000&transport.sleep=1000&socketBufferSize="+
receiveBufferSize);
@@ -54,30 +54,30 @@ public class SoWriteTimeoutTest extends 
         }
         return broker;
     }
-    
+
     public void initCombosForTestWriteTimeout() {
         addCombinationValues("brokerTransportScheme", new Object[]{"tcp", "nio"});
     }
-    
+
     public void testWriteTimeout() throws Exception {
-        
+
         Destination dest = new ActiveMQQueue("testWriteTimeout");
         messageTextPrefix = initMessagePrefix(8*1024);
         sendMessages(dest, 500);
-        
+
         URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
         LOG.info("consuming using uri: " + tcpBrokerUri);
-        
+
         SocketProxy proxy = new SocketProxy();
         proxy.setTarget(tcpBrokerUri);
         proxy.setReceiveBufferSize(receiveBufferSize);
         proxy.open();
-        
+
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(proxy.getUrl());
         Connection c = factory.createConnection();
         c.start();
         Session session = c.createSession(true, Session.SESSION_TRANSACTED);
-        MessageConsumer consumer = session.createConsumer(dest); 
+        MessageConsumer consumer = session.createConsumer(dest);
         proxy.pause();
         // writes should back up... writeTimeout will kick in a abort the connection
         TimeUnit.SECONDS.sleep(10);
@@ -89,23 +89,24 @@ public class SoWriteTimeoutTest extends 
         } catch (JMSException expected) {
         }
     }
-    
+
     public void testWriteTimeoutStompNio() throws Exception {
         ActiveMQQueue dest = new ActiveMQQueue("testWriteTimeout");
         messageTextPrefix = initMessagePrefix(8*1024);
         sendMessages(dest, 500);
-        
+
         URI stompBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(1).getConnectUri());
         LOG.info("consuming using uri: " + stompBrokerUri);
-        
+
         SocketProxy proxy = new SocketProxy();
         proxy.setTarget(new URI("tcp://localhost:" + stompBrokerUri.getPort()));
         proxy.setReceiveBufferSize(receiveBufferSize);
         proxy.open();
-        
+
         StompConnection stompConnection = new StompConnection();
         stompConnection.open(new Socket("localhost", proxy.getUrl().getPort()));
-        
+        stompConnection.getStompSocket().setTcpNoDelay(true);
+
         String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
         frame = stompConnection.receiveFrame();
@@ -113,31 +114,31 @@ public class SoWriteTimeoutTest extends 
 
         frame = "SUBSCRIBE\n" + "destination:/queue/" + dest.getQueueName() + "\n" + "ack:client\n\n"
+ Stomp.NULL;
         stompConnection.sendFrame(frame);
-        
+
         // ensure dispatch has started before pause
         frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("MESSAGE"));
-        
+
         proxy.pause();
-        
+
         // writes should back up... writeTimeout will kick in a abort the connection
         TimeUnit.SECONDS.sleep(1);
 
         // see the blocked threads
         //dumpAllThreads("blocked on write");
-        
+
         // abort should be done after this
         TimeUnit.SECONDS.sleep(10);
 
         proxy.goOn();
-        
+
         // get a buffered message
         frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("MESSAGE"));
-        
+
         // verify connection is dead
         try {
-            for (int i=0; i<100; i++) {
+            for (int i=0; i<200; i++) {
                 stompConnection.send("/queue/" + dest.getPhysicalName(),  "ShouldBeDeadConnectionText"
+ i);
             }
             fail("expected send to fail with timeout out connection");
@@ -145,7 +146,7 @@ public class SoWriteTimeoutTest extends 
             LOG.info("got exception on send after timeout: " + expected);
         }
     }
-    
+
     private String initMessagePrefix(int i) {
         byte[] content = new byte[i];
         return new String(content);



Mime
View raw message