activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1242911 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/ test/java/org/apache/activemq/transport/stomp/
Date Fri, 10 Feb 2012 20:18:50 GMT
Author: gtully
Date: Fri Feb 10 20:18:50 2012
New Revision: 1242911

URL: http://svn.apache.org/viewvc?rev=1242911&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3653 - tidy up stomptest and resolve content length
for stomp+nio, resolve break of stompnio and stompniossl tests

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1242911&r1=1242910&r2=1242911&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Fri Feb 10 20:18:50 2012
@@ -583,7 +583,7 @@ public class ProtocolConverter {
         HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.split(Stomp.COMMA)));
         acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS));
         if (acceptsVersions.isEmpty()) {
-            throw new ProtocolException("Invlid Protocol version, supported versions are:
" +
+            throw new ProtocolException("Invalid Protocol version[" + accepts +"], supported
versions are: " +
                                         Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS),
true);
         } else {
             this.version = Collections.max(acceptsVersions);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java?rev=1242911&r1=1242910&r2=1242911&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
Fri Feb 10 20:18:50 2012
@@ -58,7 +58,7 @@ public class StompCodec {
                        action = ((StompWireFormat)transport.getWireFormat()).parseAction(data);
                        headers = ((StompWireFormat)transport.getWireFormat()).parseHeaders(data);
                        String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
-                       if (contentLengthHeader != null) {
+                       if ((action.equals(Stomp.Commands.SEND) || action.equals(Stomp.Responses.MESSAGE))
&& contentLengthHeader != null) {
                            contentLength = ((StompWireFormat)transport.getWireFormat()).parseContentLength(contentLengthHeader);
                        } else {
                            contentLength = -1;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=1242911&r1=1242910&r2=1242911&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
Fri Feb 10 20:18:50 2012
@@ -52,7 +52,6 @@ public class StompConnection {
         byte[] bytes = data.getBytes("UTF-8");
         OutputStream outputStream = stompSocket.getOutputStream();
         outputStream.write(bytes);
-        outputStream.write(0);
         outputStream.flush();
     }
 
@@ -61,7 +60,6 @@ public class StompConnection {
         OutputStream outputStream = stompSocket.getOutputStream();
         outputStream.write(bytes);
         outputStream.write(data);
-        outputStream.write(0);
         outputStream.flush();
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java?rev=1242911&r1=1242910&r2=1242911&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
Fri Feb 10 20:18:50 2012
@@ -205,6 +205,8 @@ public class StompFrame implements Comma
                 buffer.append(Arrays.toString(getContent()));
             }
         }
+        // terminate the frame
+        buffer.append('\u0000');
         return buffer.toString();
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1242911&r1=1242910&r2=1242911&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Fri Feb 10 20:18:50 2012
@@ -24,6 +24,7 @@ import org.apache.activemq.broker.jmx.Br
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,6 +37,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -47,10 +49,10 @@ public class StompTest extends Combinati
     protected String jmsUri = "vm://localhost";
 
     private BrokerService broker;
-    private StompConnection stompConnection = new StompConnection();
-    private Connection connection;
-    private Session session;
-    private ActiveMQQueue queue;
+    protected StompConnection stompConnection = new StompConnection();
+    protected Connection connection;
+    protected Session session;
+    protected ActiveMQQueue queue;
     private final String xmlObject = "<pojo>\n"
             + "  <name>Dejan</name>\n"
             + "  <city>Belgrade</city>\n"
@@ -115,7 +117,7 @@ public class StompTest extends Combinati
         connection.start();
     }
 
-    private void stompConnect() throws IOException, URISyntaxException, UnknownHostException
{
+    protected void stompConnect() throws IOException, URISyntaxException, UnknownHostException
{
         URI connectUri = new URI(bindAddress);
         stompConnection.open(createSocket(connectUri));
     }
@@ -146,7 +148,7 @@ public class StompTest extends Combinati
         }
     }
 
-    private void stompDisconnect() throws IOException {
+    protected void stompDisconnect() throws IOException {
         if (stompConnection != null) {
             stompConnection.close();
             stompConnection = null;
@@ -351,8 +353,6 @@ public class StompTest extends Combinati
         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
         receiver.sendFrame(frame);
 
-        waitForFrameToTakeEffect();
-
         MessageConsumer consumer = session.createConsumer(queue);
 
         frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: msg-1\n"
+ "\n\n" + "Hello World" + Stomp.NULL;
@@ -362,7 +362,7 @@ public class StompTest extends Combinati
         assertTrue(frame.startsWith("RECEIPT"));
         assertTrue("Receipt contains correct receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID)
>= 0);
 
-        TextMessage message = (TextMessage)consumer.receive(2500);
+        TextMessage message = (TextMessage)consumer.receive(10000);
         assertNotNull(message);
         assertNull("JMS Message does not contain receipt request", message.getStringProperty(Stomp.Headers.RECEIPT_REQUESTED));
 
@@ -388,7 +388,7 @@ public class StompTest extends Combinati
             frame = sender.receiveFrame();
             assertTrue(frame.startsWith("CONNECTED"));
 
-            frame = "SEND\n" + "destination:/queue/" + getQueueName()  + "\n"  + "receipt:
" + (receiptId++) + "\n\n" + "Hello World:" + (count++) + Stomp.NULL;
+            frame = "SEND\n" + "destination:/queue/" + getQueueName()  + "\n"  + "receipt:
" + (receiptId++) + "\n" + "Hello World:" + (count++) + "\n\n" +  Stomp.NULL;
             sender.sendFrame(frame);
             frame = sender.receiveFrame();
             assertTrue("" + frame, frame.startsWith("RECEIPT"));
@@ -586,7 +586,7 @@ public class StompTest extends Combinati
         }
 
         // sleep a while before publishing another set of messages
-        waitForFrameToTakeEffect();
+        TimeUnit.SECONDS.sleep(2);
 
         for (int i = 0; i < ctr; ++i) {
             data[i] = getName() + ":second:" + i;
@@ -729,7 +729,7 @@ public class StompTest extends Combinati
         assertTrue(message.getJMSRedelivered());
     }
 
-    public void testSubscribeWithClientAckAndContentLength() throws Exception {
+    public void testSubscribeWithClientAckedAndContentLength() throws Exception {
 
         String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
@@ -753,8 +753,14 @@ public class StompTest extends Combinati
         StompFrame ack = new StompFrame("ACK", ackHeaders);
         stompConnection.sendFrame(ack.format());
 
-        // Need some time for the Ack to get processed.
-        waitForFrameToTakeEffect();
+        final QueueViewMBean queueView = getProxyToQueue(getQueueName());
+        assertTrue("dequeue complete", Wait.waitFor(new Wait.Condition(){
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("queueView, enqueue:" + queueView.getEnqueueCount() +", dequeue:"
+ queueView.getDequeueCount() + ", inflight:" + queueView.getInFlightCount());
+                return queueView.getDequeueCount() == 1;
+            }
+        }));
 
         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
@@ -785,10 +791,11 @@ public class StompTest extends Combinati
         assertTrue(frame.startsWith("MESSAGE"));
 
         // remove suscription
-        frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "\n\n"
+ Stomp.NULL;
+        frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt:1"
+  "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
-        waitForFrameToTakeEffect();
+        frame = stompConnection.receiveFrame();
+        assertTrue("" + frame, frame.startsWith("RECEIPT"));
 
         // send a message to our queue
         sendMessage("second message");
@@ -819,9 +826,7 @@ public class StompTest extends Combinati
         frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
-        waitForFrameToTakeEffect();
-
-        TextMessage message = (TextMessage)consumer.receive(2500);
+        TextMessage message = (TextMessage)consumer.receive(10000);
         assertNotNull("Should have received a message", message);
     }
 
@@ -853,11 +858,8 @@ public class StompTest extends Combinati
         frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
-        // This test case is currently failing
-        waitForFrameToTakeEffect();
-
         // only second msg should be received since first msg was rolled back
-        TextMessage message = (TextMessage)consumer.receive(2500);
+        TextMessage message = (TextMessage)consumer.receive(10000);
         assertNotNull(message);
         assertEquals("second message", message.getText().trim());
     }
@@ -868,16 +870,11 @@ public class StompTest extends Combinati
 
         stompConnection.sendFrame(frame);
 
-        // This test case is currently failing
-        waitForFrameToTakeEffect();
-
         assertClients(2);
 
         // now lets kill the stomp connection
         stompConnection.close();
 
-        Thread.sleep(2000);
-
         assertClients(1);
     }
 
@@ -1486,8 +1483,6 @@ public class StompTest extends Combinati
         stompConnection.ack(frame5, "tx3");
         stompConnection.commit("tx3");
 
-        waitForFrameToTakeEffect();
-
         stompDisconnect();
     }
 
@@ -1725,9 +1720,13 @@ public class StompTest extends Combinati
         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
-        waitForFrameToTakeEffect();
-
-        QueueViewMBean queueView = getProxyToQueue(getQueueName());
+        final QueueViewMBean queueView = getProxyToQueue(getQueueName());
+        Wait.waitFor(new Wait.Condition(){
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queueView.getDequeueCount() == 2;
+            }
+        });
         assertEquals(2, queueView.getDispatchCount());
         assertEquals(2, queueView.getDequeueCount());
         assertEquals(0, queueView.getQueueSize());
@@ -1896,7 +1895,14 @@ public class StompTest extends Combinati
         return proxy;
     }
 
-    protected void assertClients(int expected) throws Exception {
+    protected void assertClients(final int expected) throws Exception {
+        Wait.waitFor(new Wait.Condition()
+        {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker.getBroker().getClients().length == expected;
+            }
+        });
         org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
         int actual = clients.length;
 
@@ -1936,8 +1942,6 @@ public class StompTest extends Combinati
         frame = "SUBSCRIBE\n" + "destination:/queue/test.DEV-3485\n" + "ack:auto\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
-        waitForFrameToTakeEffect();
-
         stompConnection.sendFrame(test);
 
         // We only want one of them, to trigger the shutdown and potentially
@@ -1959,15 +1963,6 @@ public class StompTest extends Combinati
         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
 
-        waitForFrameToTakeEffect();
-
         stompConnection.close();
     }
-
-    protected void waitForFrameToTakeEffect() throws InterruptedException {
-        // bit of a dirty hack :)
-        // another option would be to force some kind of receipt to be returned
-        // from the frame
-        Thread.sleep(2000);
-    }
 }



Mime
View raw message