activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1157238 [3/3] - in /activemq/trunk: activemq-core/ activemq-core/src/main/filtered-resources/ activemq-core/src/main/filtered-resources/org/ activemq-core/src/main/filtered-resources/org/apache/ activemq-core/src/main/filtered-resources/or...
Date Fri, 12 Aug 2011 20:29:31 GMT
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
Fri Aug 12 20:29:29 2011
@@ -20,16 +20,22 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.net.*;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+
 import javax.net.SocketFactory;
+
 import org.apache.activemq.Service;
 import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.transport.Transport;
@@ -64,7 +70,6 @@ public class TcpTransport extends Transp
     protected DataInputStream dataIn;
     protected TimeStampStream buffOut = null;
 
-
     /**
      * The Traffic Class to be set on the socket.
      */
@@ -636,7 +641,6 @@ public class TcpTransport extends Transp
         return receiveCounter;
     }
     
-
     /**
      * @param sock The socket on which to set the Traffic Class.
      * @return Whether or not the Traffic Class was set on the given socket.

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
Fri Aug 12 20:29:29 2011
@@ -79,6 +79,7 @@ public class TcpTransportFactory extends
         return new TcpTransportServer(this, location, serverSocketFactory);
     }
 
+    @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options)
{
 
         TcpTransport tcpTransport = (TcpTransport)transport.narrow(TcpTransport.class);
@@ -98,11 +99,10 @@ public class TcpTransportFactory extends
 
         boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor",
"true"));
         if (useInactivityMonitor && isUseInactivityMonitor(transport)) {
-            transport = new InactivityMonitor(transport, format);
+            transport = createInactivityMonitor(transport, format);
             IntrospectionSupport.setProperties(transport, options);
         }
 
-
         // Only need the WireFormatNegotiator if using openwire
         if (format instanceof OpenWireFormat) {
             transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
@@ -162,4 +162,8 @@ public class TcpTransportFactory extends
     protected SocketFactory createSocketFactory() throws IOException {
         return SocketFactory.getDefault();
     }
+
+    protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+        return new InactivityMonitor(transport, format);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
Fri Aug 12 20:29:29 2011
@@ -27,7 +27,6 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java?rev=1157238&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
Fri Aug 12 20:29:29 2011
@@ -0,0 +1,546 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Stomp11Test extends CombinationTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StompTest.class);
+
+    protected String bindAddress = "stomp://localhost:61613";
+    protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
+    protected String jmsUri = "vm://localhost";
+
+    private BrokerService broker;
+    private StompConnection stompConnection = new StompConnection();
+
+    @Override
+    protected void setUp() throws Exception {
+
+        broker = BrokerFactory.createBroker(new URI(confUri));
+        broker.start();
+        broker.waitUntilStarted();
+
+        stompConnect();
+    }
+
+    private void stompConnect() throws IOException, URISyntaxException, UnknownHostException
{
+        URI connectUri = new URI(bindAddress);
+        stompConnection.open(createSocket(connectUri));
+    }
+
+    protected Socket createSocket(URI connectUri) throws IOException {
+        return new Socket("127.0.0.1", connectUri.getPort());
+    }
+
+    protected String getQueueName() {
+        return getClass().getName() + "." + getName();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        try {
+            stompDisconnect();
+        } catch(Exception e) {
+            // Some tests explicitly disconnect from stomp so can ignore
+        } finally {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    private void stompDisconnect() throws IOException {
+        if (stompConnection != null) {
+            stompConnection.close();
+            stompConnection = null;
+        }
+    }
+
+    public void testConnect() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login: system\n" +
+                              "passcode: manager\n" +
+                              "accept-version:1.1\n" +
+                              "host:localhost\n" +
+                              "request-id: 1\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("response-id:1") >= 0);
+        assertTrue(f.indexOf("version:1.1") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    public void testConnectWithVersionOptions() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login: system\n" +
+                              "passcode: manager\n" +
+                              "accept-version:1.0,1.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.1") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    public void testConnectWithValidFallback() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login: system\n" +
+                              "passcode: manager\n" +
+                              "accept-version:1.0,10.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.0") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    public void testConnectWithInvalidFallback() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login: system\n" +
+                              "passcode: manager\n" +
+                              "accept-version:9.0,10.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("ERROR"));
+        assertTrue(f.indexOf("version") >= 0);
+        assertTrue(f.indexOf("message:") >= 0);
+    }
+
+    public void testHeartbeats() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login: system\n" +
+                              "passcode: manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:0,1000\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(connectFrame);
+        String f = stompConnection.receiveFrame();
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.1") >= 0);
+        assertTrue(f.indexOf("heart-beat:") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        LOG.debug("Broker sent: " + f);
+
+        stompConnection.getStompSocket().getOutputStream().write('\n');
+
+        DataInputStream in = new DataInputStream(stompConnection.getStompSocket().getInputStream());
+        in.read();
+        {
+            long startTime = System.currentTimeMillis();
+            int input = in.read();
+            assertEquals("did not receive the correct hear beat value", '\n', input);
+            long endTime = System.currentTimeMillis();
+            assertTrue("Broker did not send KeepAlive in time", (endTime - startTime) >=
900);
+        }
+        {
+            long startTime = System.currentTimeMillis();
+            int input = in.read();
+            assertEquals("did not receive the correct hear beat value", '\n', input);
+            long endTime = System.currentTimeMillis();
+            assertTrue("Broker did not send KeepAlive in time", (endTime - startTime) >=
900);
+        }
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    public void testHeartbeatsDropsIdleConnection() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login: system\n" +
+                              "passcode: manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:1000,0\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(connectFrame);
+        String f = stompConnection.receiveFrame();
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.1") >= 0);
+        assertTrue(f.indexOf("heart-beat:") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+        LOG.debug("Broker sent: " + f);
+
+        long startTime = System.currentTimeMillis();
+
+        try {
+            f = stompConnection.receiveFrame();
+            LOG.debug("Broker sent: " + f);
+            fail();
+        } catch(Exception e) {
+        }
+
+        long endTime = System.currentTimeMillis();
+        assertTrue("Broker did close idle connection in time.", (endTime - startTime) >=
1000);
+    }
+
+    public void testRejectInvalidHeartbeats1() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login: system\n" +
+                              "passcode: manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:0\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("ERROR"));
+        assertTrue(f.indexOf("heart-beat") >= 0);
+        assertTrue(f.indexOf("message:") >= 0);
+    }
+
+    public void testRejectInvalidHeartbeats2() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login: system\n" +
+                              "passcode: manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:T,0\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("ERROR"));
+        assertTrue(f.indexOf("heart-beat") >= 0);
+        assertTrue(f.indexOf("message:") >= 0);
+    }
+
+    public void testRejectInvalidHeartbeats3() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login: system\n" +
+                              "passcode: manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:100,10,50\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("ERROR"));
+        assertTrue(f.indexOf("heart-beat") >= 0);
+        assertTrue(f.indexOf("message:") >= 0);
+    }
+
+    public void testSubscribeAndUnsubscribe() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login: system\n" +
+                              "passcode: manager\n" +
+                              "accept-version:1.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+
+        String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello
World" + Stomp.NULL;
+
+        stompConnection.sendFrame(message);
+
+        String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                       "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("MESSAGE"));
+
+        frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                "id:12345\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        Thread.sleep(2000);
+
+        stompConnection.sendFrame(message);
+
+        try {
+            frame = stompConnection.receiveFrame();
+            LOG.info("Received frame: " + frame);
+            fail("No message should have been received since subscription was removed");
+        } catch (SocketTimeoutException e) {
+        }
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    public void testSubscribeWithNoId() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login: system\n" +
+                              "passcode: manager\n" +
+                              "accept-version:1.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+
+        String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                       "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("ERROR"));
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    public void testUnsubscribeWithNoId() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login: system\n" +
+                              "passcode: manager\n" +
+                              "accept-version:1.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+
+        String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                       "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        Thread.sleep(2000);
+
+        frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "\n\n"
+ Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("ERROR"));
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    public void testAckMessageWithId() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login: system\n" +
+                              "passcode: manager\n" +
+                              "accept-version:1.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+
+        String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello
World" + Stomp.NULL;
+
+        stompConnection.sendFrame(message);
+
+        String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                       "id:12345\n" + "ack:client\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        StompFrame received = stompConnection.receive();
+        assertTrue(received.getAction().equals("MESSAGE"));
+
+        frame = "ACK\n" + "subscription:12345\n" + "message-id:" +
+                received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                "id:12345\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    public void testAckMessageWithNoId() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login: system\n" +
+                              "passcode: manager\n" +
+                              "accept-version:1.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+
+        String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello
World" + Stomp.NULL;
+
+        stompConnection.sendFrame(message);
+
+        String subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n"
+
+                           "id:12345\n" + "ack:client\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(subscribe);
+
+        StompFrame received = stompConnection.receive();
+        assertTrue(received.getAction().equals("MESSAGE"));
+
+        String ack = "ACK\n" + "message-id:" +
+                     received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(ack);
+
+        StompFrame error = stompConnection.receive();
+        assertTrue(error.getAction().equals("ERROR"));
+
+        String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                       "id:12345\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(unsub);
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    public void testQueueBrowerSubscription() throws Exception {
+
+        final int MSG_COUNT = 10;
+
+        String connectFrame = "STOMP\n" +
+                              "login: system\n" +
+                              "passcode: manager\n" +
+                              "accept-version:1.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+
+        for(int i = 0; i < MSG_COUNT; ++i) {
+            String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
+                             "receipt:0\n" +
+                             "\n" + "Hello World {" + i + "}" + Stomp.NULL;
+            stompConnection.sendFrame(message);
+            StompFrame repsonse = stompConnection.receive();
+            assertEquals("0", repsonse.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID));
+        }
+
+        String subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n"
+
+                           "id:12345\n" + "browser:true\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(subscribe);
+
+        for(int i = 0; i < MSG_COUNT; ++i) {
+            StompFrame message = stompConnection.receive();
+            assertEquals(Stomp.Responses.MESSAGE, message.getAction());
+            assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
+        }
+
+        // We should now get a browse done message
+        StompFrame browseDone = stompConnection.receive();
+        LOG.debug("Browse Done: " + browseDone.toString());
+        assertEquals(Stomp.Responses.MESSAGE, browseDone.getAction());
+        assertEquals("12345", browseDone.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
+        assertEquals("end", browseDone.getHeaders().get(Stomp.Headers.Message.BROWSER));
+        assertTrue(browseDone.getHeaders().get(Stomp.Headers.Message.DESTINATION) != null);
+
+        String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                       "id:12345\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(unsub);
+
+        Thread.sleep(2000);
+
+        subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "id:12345\n\n"
+ Stomp.NULL;
+        stompConnection.sendFrame(subscribe);
+
+        for(int i = 0; i < MSG_COUNT; ++i) {
+            StompFrame message = stompConnection.receive();
+            assertEquals(Stomp.Responses.MESSAGE, message.getAction());
+            assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
+        }
+
+        stompConnection.sendFrame(unsub);
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Fri Aug 12 20:29:29 2011
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -36,6 +37,7 @@ import javax.jms.ObjectMessage;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.management.ObjectName;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerFactory;
@@ -318,7 +320,7 @@ public class StompTest extends Combinati
         assertEquals("Hello World", message.getText());
         assertEquals("getJMSPriority", 4, message.getJMSPriority());
     }
-    
+
     public void testReceipts() throws Exception {
 
         StompConnection receiver = new StompConnection();
@@ -449,7 +451,7 @@ public class StompTest extends Combinati
 
     public void testSendMultipleBytesMessages() throws Exception {
 
-    	final int MSG_COUNT = 50;
+        final int MSG_COUNT = 50;
 
         String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
(original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
Fri Aug 12 20:29:29 2011
@@ -21,9 +21,9 @@ import java.security.cert.X509Certificat
 
 import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.TransportSupport;
-import org.apache.activemq.transport.stomp.LegacyFrameTranslator;
 import org.apache.activemq.transport.stomp.ProtocolConverter;
 import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.activemq.transport.stomp.StompInactivityMonitor;
 import org.apache.activemq.transport.stomp.StompTransport;
 import org.apache.activemq.transport.stomp.StompWireFormat;
 import org.apache.activemq.util.ByteSequence;
@@ -32,19 +32,19 @@ import org.apache.activemq.util.ServiceS
 import org.eclipse.jetty.websocket.WebSocket;
 
 /**
- * 
+ *
  * Implements web socket and mediates between servlet and the broker
  *
  */
 class StompSocket extends TransportSupport implements WebSocket, StompTransport {
     Outbound outbound;
-    ProtocolConverter protocolConverter = new ProtocolConverter(this, new LegacyFrameTranslator(),
null);
+    ProtocolConverter protocolConverter = new ProtocolConverter(this, null);
     StompWireFormat wireFormat = new StompWireFormat();
 
     public void onConnect(Outbound outbound) {
         this.outbound=outbound;
     }
-    
+
     public void onMessage(byte frame, byte[] data,int offset, int length) {}
 
     public void onMessage(byte frame, String data) {
@@ -91,4 +91,14 @@ class StompSocket extends TransportSuppo
     public void sendToStomp(StompFrame command) throws IOException {
         outbound.sendMessage(WebSocket.SENTINEL_FRAME, command.format());
     }
+
+    @Override
+    public StompInactivityMonitor getInactivityMonitor() {
+        return null;
+    }
+
+    @Override
+    public StompWireFormat getWireFormat() {
+        return this.wireFormat;
+    }
 }



Mime
View raw message