activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r1304984 [2/2] - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/transport/mqtt/ src/main/resources/META-INF/services/org/apache/activemq/transport/ src/main/resources/META-INF/services/org/apache/activemq/wireformat/...
Date Sun, 25 Mar 2012 06:33:50 GMT
Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
(from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
Sun Mar 25 06:33:49 2012
@@ -14,30 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
 
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.wireformat.WireFormat;
-
-import java.io.*;
-import java.util.HashMap;
-import java.util.Map;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.mqtt.codec.MQTTFrame;
 
 /**
  * Implements marshalling and unmarsalling the <a
- * href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ * href="http://mqtt.org/">MQTT</a> protocol.
  */
-public class StompWireFormat implements WireFormat {
+public class MQTTWireFormat implements WireFormat {
 
-    private static final byte[] NO_DATA = new byte[] {};
-    private static final byte[] END_OF_FRAME = new byte[] {0, '\n'};
 
-    private static final int MAX_COMMAND_LENGTH = 1024;
-    private static final int MAX_HEADER_LENGTH = 1024 * 10;
-    private static final int MAX_HEADERS = 1000;
-    private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+    private static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256;
 
     private boolean encodingEnabled = false;
     private int version = 1;
@@ -56,254 +55,70 @@ public class StompWireFormat implements 
         return unmarshal(dis);
     }
 
-    public void marshal(Object command, DataOutput os) throws IOException {
-        StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame)command;
-
-        if (stomp.getAction().equals(Stomp.Commands.KEEPALIVE)) {
-            os.write(Stomp.BREAK);
-            return;
-        }
-
-        StringBuilder buffer = new StringBuilder();
-        buffer.append(stomp.getAction());
-        buffer.append(Stomp.NEWLINE);
-
-        // Output the headers.
-        for (Map.Entry<String, String> entry : stomp.getHeaders().entrySet()) {
-            buffer.append(entry.getKey());
-            buffer.append(Stomp.Headers.SEPERATOR);
-            buffer.append(encodeHeader(entry.getValue()));
-            buffer.append(Stomp.NEWLINE);
+    public void marshal(Object command, DataOutput dataOut) throws IOException {
+        MQTTFrame frame = (MQTTFrame) command;
+        dataOut.write(frame.header());
+
+        int remaining = 0;
+        for (Buffer buffer : frame.buffers) {
+            remaining += buffer.length;
+        }
+        do {
+            byte digit = (byte) (remaining & 0x7F);
+            remaining >>>= 7;
+            if (remaining > 0) {
+                digit |= 0x80;
+            }
+            dataOut.write(digit);
+        } while (remaining > 0);
+        for (Buffer buffer : frame.buffers) {
+            dataOut.write(buffer.data, buffer.offset, buffer.length);
         }
-
-        // Add a newline to seperate the headers from the content.
-        buffer.append(Stomp.NEWLINE);
-
-        os.write(buffer.toString().getBytes("UTF-8"));
-        os.write(stomp.getContent());
-        os.write(END_OF_FRAME);
     }
 
-    public Object unmarshal(DataInput in) throws IOException {
-
-        try {
-
-            // parse action
-            String action = parseAction(in);
-
-            // Parse the headers
-            HashMap<String, String> headers = parseHeaders(in);
-
-            // Read in the data part.
-            byte[] data = NO_DATA;
-            String contentLength = headers.get(Stomp.Headers.CONTENT_LENGTH);
-            if ((action.equals(Stomp.Commands.SEND) || action.equals(Stomp.Responses.MESSAGE))
&& contentLength != null) {
-
-                // Bless the client, he's telling us how much data to read in.
-                int length = parseContentLength(contentLength);
-
-                data = new byte[length];
-                in.readFully(data);
-
-                if (in.readByte() != 0) {
-                    throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH + " bytes were
read and " + "there was no trailing null byte", true);
-                }
-
+    public Object unmarshal(DataInput dataIn) throws IOException {
+        byte header = dataIn.readByte();
+
+        byte digit = 0;
+
+        int multiplier = 1;
+        int length = 0;
+        do {
+            digit = dataIn.readByte();
+            length += (digit & 0x7F) * multiplier;
+            multiplier <<= 7;
+        }
+        while ((digit & 0x80) != 0);
+        if (length >= 0) {
+            if (length > MAX_MESSAGE_LENGTH) {
+                throw new IOException("The maximum message length was exceeded");
+            }
+
+            if (length > 0) {
+                byte[] data = new byte[length];
+                dataIn.readFully(data);
+                Buffer body = new Buffer(data);
+                return new MQTTFrame(body).header(header);
             } else {
-
-                // We don't know how much to read.. data ends when we hit a 0
-                byte b;
-                ByteArrayOutputStream baos = null;
-                while ((b = in.readByte()) != 0) {
-
-                    if (baos == null) {
-                        baos = new ByteArrayOutputStream();
-                    } else if (baos.size() > MAX_DATA_LENGTH) {
-                        throw new ProtocolException("The maximum data length was exceeded",
true);
-                    }
-
-                    baos.write(b);
-                }
-
-                if (baos != null) {
-                    baos.close();
-                    data = baos.toByteArray();
-                }
+                return new MQTTFrame().header(header);
             }
-
-            return new StompFrame(action, headers, data);
-
-        } catch (ProtocolException e) {
-            return new StompFrameError(e);
         }
+        return null;
     }
 
-    private String readLine(DataInput in, int maxLength, String errorMessage) throws IOException
{
-        ByteSequence sequence = readHeaderLine(in, maxLength, errorMessage);
-        return new String(sequence.getData(), sequence.getOffset(), sequence.getLength(),
"UTF-8").trim();
-    }
-
-    private ByteSequence readHeaderLine(DataInput in, int maxLength, String errorMessage)
throws IOException {
-        byte b;
-        ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
-        while ((b = in.readByte()) != '\n') {
-            if (baos.size() > maxLength) {
-                throw new ProtocolException(errorMessage, true);
-            }
-            baos.write(b);
-        }
-        baos.close();
-        return baos.toByteSequence();
-    }
-
-    protected String parseAction(DataInput in) throws IOException {
-        String action = null;
-
-        // skip white space to next real action line
-        while (true) {
-            action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
-            if (action == null) {
-                throw new IOException("connection was closed");
-            } else {
-                action = action.trim();
-                if (action.length() > 0) {
-                    break;
-                }
-            }
-        }
-        return action;
-    }
-
-    protected HashMap<String, String> parseHeaders(DataInput in) throws IOException
{
-        HashMap<String, String> headers = new HashMap<String, String>(25);
-        while (true) {
-            ByteSequence line = readHeaderLine(in, MAX_HEADER_LENGTH, "The maximum header
length was exceeded");
-            if (line != null && line.length > 1) {
-
-                if (headers.size() > MAX_HEADERS) {
-                    throw new ProtocolException("The maximum number of headers was exceeded",
true);
-                }
-
-                try {
-
-                    ByteArrayInputStream headerLine = new ByteArrayInputStream(line);
-                    ByteArrayOutputStream stream = new ByteArrayOutputStream(line.length);
-
-                    // First complete the name
-                    int result = -1;
-                    while ((result = headerLine.read()) != -1) {
-                        if (result != ':') {
-                            stream.write(result);
-                        } else {
-                            break;
-                        }
-                    }
-
-                    ByteSequence nameSeq = stream.toByteSequence();
-                    String name = new String(nameSeq.getData(), nameSeq.getOffset(), nameSeq.getLength(),
"UTF-8").trim();
-                    String value = decodeHeader(headerLine).trim();
-                    headers.put(name, value);
-                } catch (Exception e) {
-                    throw new ProtocolException("Unable to parser header line [" + line +
"]", true);
-                }
-            } else {
-                break;
-            }
-        }
-        return headers;
-    }
-
-    protected int parseContentLength(String contentLength) throws ProtocolException {
-        int length;
-        try {
-            length = Integer.parseInt(contentLength.trim());
-        } catch (NumberFormatException e) {
-            throw new ProtocolException("Specified content-length is not a valid integer",
true);
-        }
-
-        if (length > MAX_DATA_LENGTH) {
-            throw new ProtocolException("The maximum data length was exceeded", true);
-        }
-
-        return length;
-    }
-
-    private String encodeHeader(String header) throws IOException {
-        String result = header;
-        if (this.encodingEnabled) {
-            byte[] utf8buf = header.getBytes("UTF-8");
-            ByteArrayOutputStream stream = new ByteArrayOutputStream(utf8buf.length);
-            for(byte val : utf8buf) {
-                switch(val) {
-                case Stomp.ESCAPE:
-                    stream.write(Stomp.ESCAPE_ESCAPE_SEQ);
-                    break;
-                case Stomp.BREAK:
-                    stream.write(Stomp.NEWLINE_ESCAPE_SEQ);
-                    break;
-                case Stomp.COLON:
-                    stream.write(Stomp.COLON_ESCAPE_SEQ);
-                    break;
-                default:
-                    stream.write(val);
-                }
-            }
-            result =  new String(stream.toByteArray(), "UTF-8");
-        }
-
-        return result;
-    }
-
-    private String decodeHeader(InputStream header) throws IOException {
-        ByteArrayOutputStream decoded = new ByteArrayOutputStream();
-        PushbackInputStream stream = new PushbackInputStream(header);
-
-        int value = -1;
-        while( (value = stream.read()) != -1) {
-            if (value == 92) {
-
-                int next = stream.read();
-                if (next != -1) {
-                    switch(next) {
-                    case 110:
-                        decoded.write(Stomp.BREAK);
-                        break;
-                    case 99:
-                        decoded.write(Stomp.COLON);
-                        break;
-                    case 92:
-                        decoded.write(Stomp.ESCAPE);
-                        break;
-                    default:
-                        stream.unread(next);
-                        decoded.write(value);
-                    }
-                } else {
-                    decoded.write(value);
-                }
-
-            } else {
-                decoded.write(value);
-            }
-        }
-
-        return new String(decoded.toByteArray(), "UTF-8");
-    }
-
-    public int getVersion() {
-        return version;
-    }
-
+    /**
+     * @param the version of the wire format
+     */
     public void setVersion(int version) {
         this.version = version;
     }
 
-    public boolean isEncodingEnabled() {
-        return this.encodingEnabled;
+    /**
+     * @return the version of the wire format
+     */
+    public int getVersion() {
+        return this.version;
     }
 
-    public void setEncodingEnabled(boolean value) {
-        this.encodingEnabled = value;
-    }
 
 }

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java
(from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java
Sun Mar 25 06:33:49 2012
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
 
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.wireformat.WireFormatFactory;
@@ -22,8 +22,8 @@ import org.apache.activemq.wireformat.Wi
 /**
  * Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a>
protocol.
  */
-public class StompWireFormatFactory implements WireFormatFactory {
+public class MQTTWireFormatFactory implements WireFormatFactory {
     public WireFormat createWireFormat() {
-        return new StompWireFormat();
+        return new MQTTWireFormat();
     }
 }

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/ResponseHandler.java
(from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/ResponseHandler.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/ResponseHandler.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/ResponseHandler.java
Sun Mar 25 06:33:49 2012
@@ -14,17 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
 
 import java.io.IOException;
 
 import org.apache.activemq.command.Response;
 
+
 /**
- * Interface used by the ProtocolConverter for callbacks.
- * 
- * @author <a href="http://hiramchirino.com">chirino</a> 
+ * Interface used by the MQTTProtocolConverter for callbacks.
  */
 interface ResponseHandler {
-    void onResponse(ProtocolConverter converter, Response response) throws IOException;
+    void onResponse(MQTTProtocolConverter converter, Response response) throws IOException;
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java?rev=1304984&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java
Sun Mar 25 06:33:49 2012
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+public class WildCardConvertor {
+
+    static String convertActiveMQToMQTT(String name) {
+        String result = name.replaceAll("#", ">");
+        result = result.replaceAll("+", "*");
+        result = result.replaceAll("/", ".");
+        return result;
+    }
+
+    static String convertMQTTToActiveMQ(String name) {
+        String result = name.replaceAll(">", "#");
+        result = result.replaceAll("*", "+");
+        result = result.replaceAll(".", "/");
+        return result;
+    }
+}

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html
(from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/package.html)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/package.html&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/package.html
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html
Sun Mar 25 06:33:49 2012
@@ -19,8 +19,7 @@
 </head>
 <body>
 
-An implementation of the Stomp protocol which is a simple wire protocol for writing clients
for ActiveMQ in different
-languages like Ruby, Python, PHP, C etc.
+An implementation of the MQTT 3.1 protocol - see http://mqtt.org/
 
 </body>
 </html>

Copied: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt
(from r1303689, activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt?p2=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt&p1=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp
(original)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt
Sun Mar 25 06:33:49 2012
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.transport.stomp.StompTransportFactory
+class=org.apache.activemq.transport.mqtt.MQTTTransportFactory

Copied: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/mqtt
(from r1303689, activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/mqtt?p2=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/mqtt&p1=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp
(original)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/mqtt
Sun Mar 25 06:33:49 2012
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.transport.stomp.StompWireFormatFactory
\ No newline at end of file
+class=org.apache.activemq.transport.mqtt.MQTTWireFormatFactory
\ No newline at end of file

Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
(from r1303689, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
Sun Mar 25 06:33:49 2012
@@ -14,29 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
 
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
 import java.util.Vector;
-import javax.net.ServerSocketFactory;
-import org.apache.activemq.broker.BrokerPlugin;
+
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.security.JaasDualAuthenticationPlugin;
-import org.apache.activemq.util.Wait;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-import static junit.framework.Assert.assertTrue;
-
 // https://issues.apache.org/jira/browse/AMQ-3393
-public class ConnectTest {
-    private static final Logger LOG = LoggerFactory.getLogger(ConnectTest.class);
+public class MQTTConnectTest {
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTConnectTest.class);
     BrokerService brokerService;
     Vector<Throwable> exceptions = new Vector<Throwable>();
 
@@ -56,123 +49,18 @@ public class ConnectTest {
     }
 
     @Test
-    public void testStompConnectLeak() throws Exception {
+    public void testConnect() throws Exception {
 
-        brokerService.addConnector("stomp://0.0.0.0:0?transport.soLinger=0");
+        brokerService.addConnector("mqtt://localhost:1883");
         brokerService.start();
-
-        Thread t1 = new Thread() {
-            StompConnection connection = new StompConnection();
-
-            public void run() {
-                try {
-                    connection.open("localhost", brokerService.getTransportConnectors().get(0).getConnectUri().getPort());
-                    connection.connect("system", "manager");
-                    connection.disconnect();
-                } catch (Exception ex) {
-                    LOG.error("unexpected exception on connect/disconnect", ex);
-                    exceptions.add(ex);
-                }
-            }
-        };
-
-        int i = 0;
-        long done = System.currentTimeMillis() + (15 * 1000);
-        while (System.currentTimeMillis() < done) {
-            t1.run();
-            if (++i % 5000 == 0) {
-                LOG.info("connection count on stomp connector:" + brokerService.getTransportConnectors().get(0).connectionCount());
-            }
-        }
-
-        assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return 0 == brokerService.getTransportConnectors().get(0).connectionCount();
-            }
-        }));
-        assertTrue("no exceptions", exceptions.isEmpty());
+        MQTT mqtt = new MQTT();
+        mqtt.setHost("localhost",1883);
+        BlockingConnection connection = mqtt.blockingConnection();
+
+        connection.connect();
+        Thread.sleep(1000);
+        connection.disconnect();
     }
 
-    @Test
-    public void testJaasDualStopWithOpenConnection() throws Exception {
-
-        brokerService.setPlugins(new BrokerPlugin[]{new JaasDualAuthenticationPlugin()});
-        brokerService.addConnector("stomp://0.0.0.0:0?transport.closeAsync=false");
-        brokerService.start();
-
-        final int listenPort = brokerService.getTransportConnectors().get(0).getConnectUri().getPort();
-        Thread t1 = new Thread() {
-            StompConnection connection = new StompConnection();
-
-            public void run() {
-                try {
-                    connection.open("localhost", listenPort);
-                    connection.connect("system", "manager");
-                } catch (Exception ex) {
-                    LOG.error("unexpected exception on connect/disconnect", ex);
-                    exceptions.add(ex);
-                }
-            }
-        };
-
-        t1.run();
-
-        assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return 1 == brokerService.getTransportConnectors().get(0).connectionCount();
-            }
-        }));
-
-        brokerService.stop();
-
-        // server socket should be available after stop
-        ServerSocket socket = ServerSocketFactory.getDefault().createServerSocket();
-        socket.setReuseAddress(true);
-        InetAddress address = InetAddress.getLocalHost();
-        socket.bind(new InetSocketAddress(address, listenPort));
-        LOG.info("bound address: " + socket);
-        socket.close();
-        assertTrue("no exceptions", exceptions.isEmpty());
-    }
-
-    @Test
-    public void testInactivityMonitor() throws Exception {
-
-        brokerService.addConnector("stomp://0.0.0.0:0?transport.defaultHeartBeat=5000,0&transport.useKeepAlive=false");
-        brokerService.start();
-
-        Thread t1 = new Thread() {
-            StompConnection connection = new StompConnection();
-
-            public void run() {
-                try {
-                    connection.open("localhost",  brokerService.getTransportConnectors().get(0).getConnectUri().getPort());
-                    connection.connect("system", "manager");
-                } catch (Exception ex) {
-                    LOG.error("unexpected exception on connect/disconnect", ex);
-                    exceptions.add(ex);
-                }
-            }
-        };
-
-        t1.run();
-
-        assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
-                 @Override
-                 public boolean isSatisified() throws Exception {
-                     return 1 == brokerService.getTransportConnectors().get(0).connectionCount();
-                 }
-             }));
-
-        // and it should be closed due to inactivity
-        assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return 0 == brokerService.getTransportConnectors().get(0).connectionCount();
-            }
-        }));
-        assertTrue("no exceptions", exceptions.isEmpty());
-    }
+    
 }
\ No newline at end of file

Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
(from r1303689, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Sun Mar 25 06:33:49 2012
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
 
 import java.io.IOException;
 import java.net.Socket;
@@ -41,7 +41,6 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerFactory;
@@ -50,14 +49,18 @@ import org.apache.activemq.broker.jmx.Br
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.transport.stomp.SamplePojo;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompConnection;
+import org.apache.activemq.transport.stomp.StompFrame;
 import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StompTest extends CombinationTestSupport {
-    private static final Logger LOG = LoggerFactory.getLogger(StompTest.class);
+public class MQTTTest extends CombinationTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
 
-    protected String bindAddress = "stomp://localhost:61613";
+    protected String bindAddress = "mqtt://localhost:1883";
     protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
     protected String jmsUri = "vm://localhost";
 



Mime
View raw message