activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r558044 - in /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp: StompConnection.java StompSubscriptionRemoveTest.java StompTest.java
Date Fri, 20 Jul 2007 16:47:42 GMT
Author: rajdavies
Date: Fri Jul 20 09:47:40 2007
New Revision: 558044

URL: http://svn.apache.org/viewvc?view=rev&rev=558044
Log:
Applying patch for http://issues.apache.org/activemq/browse/AMQ-1323

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java?view=auto&rev=558044
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
Fri Jul 20 09:47:40 2007
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+
+public class StompConnection {
+
+    public static final long RECEIVE_TIMEOUT = 10000;
+    
+    private Socket stompSocket;
+    private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
+
+    public void open(String host, int port) throws IOException, UnknownHostException {
+        stompSocket = new Socket(host, port);
+    }
+
+	public void close() throws IOException {
+		if (stompSocket != null) {
+		    stompSocket.close();
+	        stompSocket = null;
+		}
+	}
+	
+    public void sendFrame(String data) throws Exception {
+        byte[] bytes = data.getBytes("UTF-8");
+        OutputStream outputStream = stompSocket.getOutputStream();
+        outputStream.write(bytes);
+        outputStream.write(0);
+        outputStream.flush();
+    }
+
+    public String receiveFrame() throws Exception {
+    	return receiveFrame(RECEIVE_TIMEOUT);
+    }
+
+    private String receiveFrame(long timeOut) throws Exception {
+        stompSocket.setSoTimeout((int) timeOut);
+        InputStream is = stompSocket.getInputStream();
+        int c = 0;
+        for (;;) {
+            c = is.read();
+            if (c < 0) {
+                throw new IOException("socket closed.");
+            }
+            else if (c == 0) {
+                c = is.read();
+                if (c != '\n') {
+                	throw new IOException("Expecting stomp frame to terminate with \0\n");
+                }
+                byte[] ba = inputBuffer.toByteArray();
+                inputBuffer.reset();
+                return new String(ba, "UTF-8");
+            }
+            else {
+                inputBuffer.write(c);
+            }
+        }
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java?view=diff&rev=558044&r1=558043&r2=558044
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java
Fri Jul 20 09:47:40 2007
@@ -18,13 +18,9 @@
 package org.apache.activemq.transport.stomp;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
 
 import javax.jms.Connection;
 import javax.jms.Message;
@@ -45,10 +41,11 @@
  */
 public class StompSubscriptionRemoveTest extends TestCase {
     private static final Log log = LogFactory.getLog(StompSubscriptionRemoveTest.class);
+    private static final String COMMAND_MESSAGE = "MESSAGE";
+    private static final String HEADER_MESSAGE_ID = "message-id";
+    private static final int STOMP_PORT = 61613;
 
-    private Socket stompSocket;
-    private ByteArrayOutputStream inputBuffer;
-
+    private StompConnection stompConnection = new StompConnection();
     
     public void testRemoveSubscriber() throws Exception {
         BrokerService broker = new BrokerService();
@@ -68,104 +65,64 @@
             log.debug("Sending: " + idx);
         }
         producer.close();
-        // consumer.close();
         session.close();
         connection.close();
 
-        stompSocket = new Socket("localhost", 61613);
-        inputBuffer = new ByteArrayOutputStream();
+        stompConnection.open("localhost", STOMP_PORT);
 
         String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" +
"\n";
-        sendFrame(connect_frame);
+        stompConnection.sendFrame(connect_frame);
 
-        String f = receiveFrame(100000);
+        stompConnection.receiveFrame();
         String frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n"
+ "ack:client\n\n";
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
+        
         int messagesCount = 0;
         int count = 0;
         while (count < 2) {
-            String receiveFrame = receiveFrame(10000);
-            DataInput input = new DataInputStream(new ByteArrayInputStream(receiveFrame.getBytes()));
-            String line;
-            while (true) {
-                line = input.readLine();
-                if (line == null) {
-                    throw new IOException("connection was closed");
-                }
-                else {
-                    line = line.trim();
-                    if (line.length() > 0) {
-                        break;
-                    }
-                }
-            }
-            line = input.readLine();
-            if (line == null) {
-                throw new IOException("connection was closed");
-            }
-            String messageId = line.substring(line.indexOf(':') + 1);
-            messageId = messageId.trim();
-            String ackmessage = "ACK\n" + "message-id:" + messageId + "\n\n";
-            sendFrame(ackmessage);
-            log.debug(receiveFrame);
-            //Thread.sleep(1000);
+            String receiveFrame = stompConnection.receiveFrame();
+            log.debug("Received: " + receiveFrame);
+            assertEquals("Unexpected frame received", COMMAND_MESSAGE, getCommand(receiveFrame));
+            String messageId = getHeaderValue(receiveFrame, HEADER_MESSAGE_ID);
+            String ackmessage = "ACK\n" + HEADER_MESSAGE_ID + ":" + messageId + "\n\n";
+            stompConnection.sendFrame(ackmessage);
+            // Thread.sleep(1000);
             ++messagesCount;
             ++count;
         }
 
-        sendFrame("DISCONNECT\n\n");
+        stompConnection.sendFrame("DISCONNECT\n\n");
         Thread.sleep(1000);
-        stompSocket.close();
+        stompConnection.close();
 
-        stompSocket = new Socket("localhost", 61613);
-        inputBuffer = new ByteArrayOutputStream();
+        stompConnection.open("localhost", STOMP_PORT);
 
         connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n";
-        sendFrame(connect_frame);
+        stompConnection.sendFrame(connect_frame);
 
-        f = receiveFrame(5000);
+        stompConnection.receiveFrame();
 
         frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n" + "ack:client\n\n";
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
         try {
             while (count != 2000) {
-                String receiveFrame = receiveFrame(5000);
-                DataInput input = new DataInputStream(new ByteArrayInputStream(receiveFrame.getBytes()));
-                String line;
-                while (true) {
-                    line = input.readLine();
-                    if (line == null) {
-                        throw new IOException("connection was closed");
-                    }
-                    else {
-                        line = line.trim();
-                        if (line.length() > 0) {
-                            break;
-                        }
-                    }
-                }
-
-                line = input.readLine();
-                if (line == null) {
-                    throw new IOException("connection was closed");
-                }
-                String messageId = line.substring(line.indexOf(':') + 1);
-                messageId = messageId.trim();
-                String ackmessage = "ACK\n" + "message-id:" + messageId + "\n\n";
-                sendFrame(ackmessage);
+                String receiveFrame = stompConnection.receiveFrame();
                 log.debug("Received: " + receiveFrame);
+                assertEquals("Unexpected frame received", COMMAND_MESSAGE, getCommand(receiveFrame));
+                String messageId = getHeaderValue(receiveFrame, HEADER_MESSAGE_ID);
+                String ackmessage = "ACK\n" + HEADER_MESSAGE_ID + ":" + messageId.trim()
+ "\n\n";
+                stompConnection.sendFrame(ackmessage);
                 //Thread.sleep(1000);
                 ++messagesCount;
                 ++count;
             }
-
         }
         catch (IOException ex) {
             ex.printStackTrace();
         }
 
-        sendFrame("DISCONNECT\n\n");
-        stompSocket.close();
+        stompConnection.sendFrame("DISCONNECT\n\n");
+        stompConnection.close();
         broker.stop();
 
         log.info("Total messages received: " + messagesCount);
@@ -178,36 +135,35 @@
         // Subscription without any connections
     }
 
-    public void sendFrame(String data) throws Exception {
-        byte[] bytes = data.getBytes("UTF-8");
-        OutputStream outputStream = stompSocket.getOutputStream();
-        outputStream.write(bytes);
-        outputStream.write(0);
-        outputStream.flush();
+    protected String getDestinationName() {
+        return getClass().getName() + "." + getName();
     }
 
-    public String receiveFrame(long timeOut) throws Exception {
-        stompSocket.setSoTimeout((int) timeOut);
-        InputStream is = stompSocket.getInputStream();
-        int c = 0;
-        for (;;) {
-            c = is.read();
-            if (c < 0) {
-                throw new IOException("socket closed.");
-            }
-            else if (c == 0) {
-                c = is.read();
-                byte[] ba = inputBuffer.toByteArray();
-                inputBuffer.reset();
-                return new String(ba, "UTF-8");
-            }
-            else {
-                inputBuffer.write(c);
-            }
-        }
+    // These two methods could move to a utility class
+    protected String getCommand(String frame) {
+    	return frame.substring(0, frame.indexOf('\n') + 1).trim();
     }
 
-    protected String getDestinationName() {
-        return getClass().getName() + "." + getName();
+    protected String getHeaderValue (String frame, String header) throws IOException {
+        DataInput input = new DataInputStream(new ByteArrayInputStream(frame.getBytes()));
+        String line;
+        for (int idx = 0; /*forever, sort of*/; ++idx) {
+            line = input.readLine();
+            if (line == null) {
+            	// end of message, no headers
+            	return null;
+            } 
+            line = line.trim();
+            if (line.length() == 0) {
+            	// start body, no headers from here on
+            	return null;
+            } 
+            if (idx > 0) {     // Ignore command line
+            	int pos = line.indexOf(':');
+            	if (header.equals(line.substring(0, pos))) {
+            		return line.substring(pos + 1).trim();
+            	}
+            }
+        }
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?view=diff&rev=558044&r1=558043&r2=558044
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Fri Jul 20 09:47:40 2007
@@ -22,14 +22,10 @@
 import org.apache.activemq.broker.*;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.transport.stomp.Stomp;
 
 import javax.jms.*;
 import javax.jms.Connection;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.net.URI;
@@ -40,8 +36,7 @@
 
     private BrokerService broker;
     private TransportConnector connector;
-    private Socket stompSocket;
-    private ByteArrayOutputStream inputBuffer;
+    private StompConnection stompConnection = new StompConnection();
     private Connection connection;
     private Session session;
     private ActiveMQQueue queue;
@@ -55,8 +50,7 @@
         broker.start();
 
         URI connectUri = connector.getConnectUri();
-        stompSocket = createSocket(connectUri);
-        inputBuffer = new ByteArrayOutputStream();
+        stompConnection.open("127.0.0.1", connectUri.getPort());
 
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
         connection = cf.createConnection();
@@ -66,7 +60,7 @@
     }
 
     protected Socket createSocket(URI connectUri) throws IOException {
-        return new Socket("127.0.0.1", connectUri.getPort());
+        return new Socket();
     }
 
     protected String getQueueName() {
@@ -75,43 +69,10 @@
 
     protected void tearDown() throws Exception {
         connection.close();
-        if (stompSocket != null) {
-            stompSocket.close();
-        }
+        stompConnection.close();
         broker.stop();
     }
 
-    public void sendFrame(String data) throws Exception {
-        byte[] bytes = data.getBytes("UTF-8");
-        OutputStream outputStream = stompSocket.getOutputStream();
-        for (int i = 0; i < bytes.length; i++) {
-            outputStream.write(bytes[i]);
-        }
-        outputStream.flush();
-    }
-
-    public String receiveFrame(long timeOut) throws Exception {
-        stompSocket.setSoTimeout((int) timeOut);
-        InputStream is = stompSocket.getInputStream();
-        int c=0;
-        for(;;) {
-            c = is.read();
-            if( c < 0 ) {
-                throw new IOException("socket closed.");
-            } else if( c == 0 ) {
-                c = is.read();
-                assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
-                byte[] ba = inputBuffer.toByteArray();
-                inputBuffer.reset();
-                return new String(ba, "UTF-8");
-            } else {
-                inputBuffer.write(c);
-            }
-        }
-    }
-
-
-
     public void sendMessage(String msg) throws Exception {
         sendMessage(msg, "foo", "xyz");
     }
@@ -128,15 +89,14 @@
         BytesMessage message = session.createBytesMessage();
         message.writeBytes(msg);
         producer.send(message);
-
     }
 
     public void testConnect() throws Exception {
 
         String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" +
"request-id: 1\n" + "\n" + Stomp.NULL;
-        sendFrame(connect_frame);
+        stompConnection.sendFrame(connect_frame);
 
-        String f = receiveFrame(10000);
+        String f = stompConnection.receiveFrame();
         assertTrue(f.startsWith("CONNECTED"));
         assertTrue(f.indexOf("response-id:1") >= 0);
 
@@ -151,9 +111,9 @@
             "login: brianm\n" +
             "passcode: wombats\n\n"+
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
-        frame = receiveFrame(10000);
+        frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("CONNECTED"));
 
         frame =
@@ -162,7 +122,7 @@
             "Hello World" +
             Stomp.NULL;
 
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         TextMessage message = (TextMessage) consumer.receive(1000);
         assertNotNull(message);
@@ -184,9 +144,9 @@
             "login: brianm\n" +
             "passcode: wombats\n\n"+
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
-        frame = receiveFrame(10000);
+        frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("CONNECTED"));
 
         frame =
@@ -196,7 +156,7 @@
             "Hello World" +
             Stomp.NULL;
 
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         TextMessage message = (TextMessage) consumer.receive(1000);
         assertNotNull(message);
@@ -213,9 +173,9 @@
             "login: brianm\n" +
             "passcode: wombats\n\n"+
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
-        frame = receiveFrame(10000);
+        frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("CONNECTED"));
 
         frame =
@@ -226,7 +186,7 @@
             "Hello World" +
             Stomp.NULL;
 
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         TextMessage message = (TextMessage) consumer.receive(1000);
         assertNotNull(message);
@@ -244,9 +204,9 @@
             "login: brianm\n" +
             "passcode: wombats\n\n"+
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
-        frame = receiveFrame(10000);
+        frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("CONNECTED"));
 
         frame =
@@ -261,7 +221,7 @@
             "Hello World" +
             Stomp.NULL;
 
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         TextMessage message = (TextMessage) consumer.receive(1000);
         assertNotNull(message);
@@ -284,9 +244,9 @@
             "login: brianm\n" +
             "passcode: wombats\n\n"+
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
-        frame = receiveFrame(100000);
+        frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("CONNECTED"));
 
         frame =
@@ -294,18 +254,18 @@
             "destination:/queue/" + getQueueName() + "\n" +
             "ack:auto\n\n" +
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         sendMessage(getName());
 
-        frame = receiveFrame(10000);
+        frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("MESSAGE"));
 
         frame =
             "DISCONNECT\n" +
             "\n\n"+
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
     }
 
         public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
@@ -315,9 +275,9 @@
             "login: brianm\n" +
             "passcode: wombats\n\n"+
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
-        frame = receiveFrame(100000);
+        frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("CONNECTED"));
 
         frame =
@@ -325,11 +285,11 @@
             "destination:/queue/" + getQueueName() + "\n" +
             "ack:auto\n\n" +
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         sendBytesMessage(new byte[] {1,2,3,4,5});
 
-        frame = receiveFrame(10000);
+        frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("MESSAGE"));
 
         Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
@@ -343,7 +303,7 @@
             "DISCONNECT\n" +
             "\n\n"+
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
     }
 
     public void testSubscribeWithMessageSentWithProperties() throws Exception {
@@ -353,9 +313,9 @@
             "login: brianm\n" +
             "passcode: wombats\n\n"+
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
-        frame = receiveFrame(100000);
+        frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("CONNECTED"));
 
         frame =
@@ -363,7 +323,7 @@
             "destination:/queue/" + getQueueName() + "\n" +
             "ack:auto\n\n" +
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
 
         MessageProducer producer = session.createProducer(queue);
@@ -378,7 +338,7 @@
         message.setShortProperty("s", (short) 12);
         producer.send(message);
 
-        frame = receiveFrame(10000);
+        frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("MESSAGE"));
 
 //        System.out.println("out: "+frame);
@@ -387,7 +347,7 @@
             "DISCONNECT\n" +
             "\n\n"+
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
     }
 
     public void testMessagesAreInOrder() throws Exception {
@@ -399,9 +359,9 @@
                 "login: brianm\n" +
                 "passcode: wombats\n\n" +
                 Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
-        frame = receiveFrame(100000);
+        frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("CONNECTED"));
 
         frame =
@@ -409,7 +369,7 @@
                 "destination:/queue/" + getQueueName() + "\n" +
                 "ack:auto\n\n" +
                 Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         for (int i = 0; i < ctr; ++i) {
             data[i] = getName() + i;
@@ -417,7 +377,7 @@
         }
 
         for (int i = 0; i < ctr; ++i) {
-            frame = receiveFrame(1000);
+            frame = stompConnection.receiveFrame();
             assertTrue("Message not in order", frame.indexOf(data[i]) >=0 );
         }
 
@@ -430,7 +390,7 @@
         }
 
         for (int i = 0; i < ctr; ++i) {
-            frame = receiveFrame(1000);
+            frame = stompConnection.receiveFrame();
             assertTrue("Message not in order", frame.indexOf(data[i]) >=0 );
         }
 
@@ -438,7 +398,7 @@
                 "DISCONNECT\n" +
                 "\n\n" +
                 Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
     }
 
 
@@ -449,9 +409,9 @@
             "login: brianm\n" +
             "passcode: wombats\n\n"+
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
-        frame = receiveFrame(100000);
+        frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("CONNECTED"));
 
         frame =
@@ -460,12 +420,12 @@
             "selector: foo = 'zzz'\n" +
             "ack:auto\n\n" +
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         sendMessage("Ignored message", "foo", "1234");
         sendMessage("Real message", "foo", "zzz");
 
-        frame = receiveFrame(10000);
+        frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("MESSAGE"));
         assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real
message") > 0);
 
@@ -473,7 +433,7 @@
             "DISCONNECT\n" +
             "\n\n"+
             Stomp.NULL;
-       sendFrame(frame);
+       stompConnection.sendFrame(frame);
     }
 
 
@@ -484,9 +444,9 @@
             "login: brianm\n" +
             "passcode: wombats\n\n"+
             Stomp.NULL;
-       sendFrame(frame);
+       stompConnection.sendFrame(frame);
 
-       frame = receiveFrame(10000);
+       frame = stompConnection.receiveFrame();
        assertTrue(frame.startsWith("CONNECTED"));
 
 
@@ -497,25 +457,22 @@
             Stomp.NULL;
 
 
-       sendFrame(frame);
+       stompConnection.sendFrame(frame);
        sendMessage(getName());
-       frame = receiveFrame(10000);
+       frame = stompConnection.receiveFrame();
        assertTrue(frame.startsWith("MESSAGE"));
 
        frame =
             "DISCONNECT\n" +
             "\n\n"+
             Stomp.NULL;
-       sendFrame(frame);
+       stompConnection.sendFrame(frame);
 
        // message should be received since message was not acknowledged
        MessageConsumer consumer = session.createConsumer(queue);
        TextMessage message = (TextMessage) consumer.receive(1000);
        assertNotNull(message);
        assertTrue(message.getJMSRedelivered());
-
-
-
     }
 
     public void testUnsubscribe() throws Exception {
@@ -525,8 +482,8 @@
             "login: brianm\n" +
             "passcode: wombats\n\n"+
             Stomp.NULL;
-        sendFrame(frame);
-        frame = receiveFrame(100000);
+        stompConnection.sendFrame(frame);
+        frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("CONNECTED"));
 
         frame =
@@ -534,14 +491,13 @@
             "destination:/queue/" + getQueueName() + "\n" +
             "ack:auto\n\n" +
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         //send a message to our queue
         sendMessage("first message");
 
-
         //receive message from socket
-        frame = receiveFrame(1000);
+        frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("MESSAGE"));
 
         //remove suscription
@@ -550,7 +506,7 @@
             "destination:/queue/" + getQueueName() + "\n" +
             "\n\n" +
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         waitForFrameToTakeEffect();
 
@@ -559,7 +515,7 @@
 
 
         try {
-            frame = receiveFrame(1000);
+            frame = stompConnection.receiveFrame();
             log.info("Received frame: " + frame);
             fail("No message should have been received since subscription was removed");
         }catch (SocketTimeoutException e){
@@ -577,9 +533,9 @@
             "login: brianm\n" +
             "passcode: wombats\n\n"+
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
-        String f = receiveFrame(1000);
+        String f = stompConnection.receiveFrame();
         assertTrue(f.startsWith("CONNECTED"));
 
         frame =
@@ -587,7 +543,7 @@
             "transaction: tx1\n" +
             "\n\n" +
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         frame =
             "SEND\n" +
@@ -596,14 +552,14 @@
             "\n\n" +
             "Hello World" +
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         frame =
             "COMMIT\n" +
             "transaction: tx1\n" +
             "\n\n" +
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         waitForFrameToTakeEffect();
 
@@ -619,9 +575,9 @@
             "login: brianm\n" +
             "passcode: wombats\n\n"+
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
-        String f = receiveFrame(1000);
+        String f = stompConnection.receiveFrame();
         assertTrue(f.startsWith("CONNECTED"));
 
         frame =
@@ -629,7 +585,7 @@
             "transaction: tx1\n" +
             "\n\n" +
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         frame =
             "SEND\n" +
@@ -638,7 +594,7 @@
             "\n" +
             "first message" +
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         //rollback first message
         frame =
@@ -646,14 +602,14 @@
             "transaction: tx1\n" +
             "\n\n" +
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         frame =
             "BEGIN\n" +
             "transaction: tx1\n" +
             "\n\n" +
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         frame =
             "SEND\n" +
@@ -662,14 +618,14 @@
             "\n" +
             "second message" +
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         frame =
             "COMMIT\n" +
             "transaction: tx1\n" +
             "\n\n" +
             Stomp.NULL;
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         // This test case is currently failing
         waitForFrameToTakeEffect();
@@ -688,16 +644,15 @@
             "passcode: wombats\n\n"+
             Stomp.NULL;
 
-        sendFrame(frame);
+        stompConnection.sendFrame(frame);
 
         // This test case is currently failing
         waitForFrameToTakeEffect();
 
         assertClients(2);
 
-        // now lets kill the socket
-        stompSocket.close();
-        stompSocket = null;
+        // now lets kill the stomp connection
+        stompConnection.close();
 
         Thread.sleep(2000);
 



Mime
View raw message