Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 61DEB1179E for ; Mon, 4 Aug 2014 22:58:29 +0000 (UTC) Received: (qmail 82303 invoked by uid 500); 4 Aug 2014 22:58:29 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 82260 invoked by uid 500); 4 Aug 2014 22:58:29 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 82251 invoked by uid 99); 4 Aug 2014 22:58:29 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Aug 2014 22:58:29 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 0601C91FF7A; Mon, 4 Aug 2014 22:58:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Message-Id: <074f0f2f64bc4be691863feee45d06b8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: https://issues.apache.org/jira/browse/AMQ-5307 Date: Mon, 4 Aug 2014 22:58:28 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/trunk 9743dbddb -> 7c04ead46 https://issues.apache.org/jira/browse/AMQ-5307 Fixed the MQTTCodec to properly handle frames that come in split up or bunched together. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7c04ead4 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7c04ead4 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7c04ead4 Branch: refs/heads/trunk Commit: 7c04ead460f646517f39b365ed6168dd1c1e556c Parents: 9743dbd Author: Timothy Bish Authored: Mon Aug 4 18:58:03 2014 -0400 Committer: Timothy Bish Committed: Mon Aug 4 18:58:03 2014 -0400 ---------------------------------------------------------------------- .../activemq/transport/mqtt/MQTTCodec.java | 243 ++++++++++--------- .../transport/mqtt/MQTTNIOSSLTransport.java | 4 +- .../transport/mqtt/MQTTProtocolSupport.java | 48 ++++ .../transport/mqtt/MQTTAuthTestSupport.java | 8 + .../activemq/transport/mqtt/MQTTAuthTests.java | 8 +- .../activemq/transport/mqtt/MQTTCodecTest.java | 178 ++++++++++++++ .../activemq/transport/mqtt/MQTTNIOSSLTest.java | 33 +++ 7 files changed, 407 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/7c04ead4/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java index f3e16aa..c892dd1 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java @@ -18,144 +18,165 @@ package org.apache.activemq.transport.mqtt; import java.io.IOException; -import javax.jms.JMSException; import org.apache.activemq.transport.tcp.TcpTransport; import org.fusesource.hawtbuf.DataByteArrayInputStream; import org.fusesource.hawtbuf.DataByteArrayOutputStream; -import org.fusesource.mqtt.codec.*; +import org.fusesource.mqtt.codec.MQTTFrame; public class MQTTCodec { - TcpTransport transport; + private final MQTTFrameSink frameSink; + private final DataByteArrayOutputStream currentCommand = new DataByteArrayOutputStream(); + private byte header; - DataByteArrayOutputStream currentCommand = new DataByteArrayOutputStream(); - boolean processedHeader = false; - String action; - byte header; - int contentLength = -1; - int previousByte = -1; - int payLoadRead = 0; + private int contentLength = -1; + private int payLoadRead = 0; - public MQTTCodec(TcpTransport transport) { - this.transport = transport; + public interface MQTTFrameSink { + void onFrame(MQTTFrame mqttFrame); } - public void parse(DataByteArrayInputStream input, int readSize) throws Exception { - int i = 0; - byte b; - while (i++ < readSize) { - b = input.readByte(); - // skip repeating nulls - if (!processedHeader && b == 0) { - previousByte = 0; - continue; - } + private FrameParser currentParser; - if (!processedHeader) { - i += processHeader(b, input); - if (contentLength == 0) { - processCommand(); - } + // Internal parsers implement this and we switch to the next as we go. + private interface FrameParser { - } else { + void parse(DataByteArrayInputStream data, int readSize) throws IOException; - if (contentLength == -1) { - // end of command reached, unmarshal - if (b == 0) { - processCommand(); - } else { - currentCommand.write(b); - } - } else { - // read desired content length - if (payLoadRead == contentLength) { - processCommand(); - i += processHeader(b, input); - } else { - currentCommand.write(b); - payLoadRead++; - } - } + void reset() throws IOException; + } + + public MQTTCodec(MQTTFrameSink sink) { + this.frameSink = sink; + } + + public MQTTCodec(final TcpTransport transport) { + this.frameSink = new MQTTFrameSink() { + + @Override + public void onFrame(MQTTFrame mqttFrame) { + transport.doConsume(mqttFrame); } + }; + } - previousByte = b; - } - if (processedHeader && payLoadRead == contentLength) { - processCommand(); + public void parse(DataByteArrayInputStream input, int readSize) throws Exception { + if (currentParser == null) { + currentParser = initializeHeaderParser(); } + + // Parser stack will run until current incoming data has all been consumed. + currentParser.parse(input, readSize); } - /** - * sets the content length - * - * @return number of bytes read - */ - private int processHeader(byte header, DataByteArrayInputStream input) { - this.header = header; - byte digit; - int multiplier = 1; - int read = 0; - int length = 0; - do { - digit = input.readByte(); - length += (digit & 0x7F) * multiplier; - multiplier <<= 7; - read++; - } while ((digit & 0x80) != 0); - - contentLength = length; - processedHeader = true; - return read; + private void processCommand() throws IOException { + MQTTFrame frame = new MQTTFrame(currentCommand.toBuffer().deepCopy()).header(header); + frameSink.onFrame(frame); } + //----- Prepare the current frame parser for use -------------------------// - private void processCommand() throws Exception { - MQTTFrame frame = new MQTTFrame(currentCommand.toBuffer().deepCopy()).header(header); - transport.doConsume(frame); - processedHeader = false; - currentCommand.reset(); - contentLength = -1; - payLoadRead = 0; + private FrameParser initializeHeaderParser() throws IOException { + headerParser.reset(); + return headerParser; } - public static String commandType(byte header) throws IOException, JMSException { + private FrameParser initializeVariableLengthParser() throws IOException { + variableLengthParser.reset(); + return variableLengthParser; + } - byte messageType = (byte) ((header & 0xF0) >>> 4); - switch (messageType) { - case PINGREQ.TYPE: { - return "PINGREQ"; - } - case CONNECT.TYPE: { - return "CONNECT"; - } - case DISCONNECT.TYPE: { - return "DISCONNECT"; - } - case SUBSCRIBE.TYPE: { - return "SUBSCRIBE"; - } - case UNSUBSCRIBE.TYPE: { - return "UNSUBSCRIBE"; - } - case PUBLISH.TYPE: { - return "PUBLISH"; - } - case PUBACK.TYPE: { - return "PUBACK"; - } - case PUBREC.TYPE: { - return "PUBREC"; - } - case PUBREL.TYPE: { - return "PUBREL"; + private FrameParser initializeContentParser() throws IOException { + contentParser.reset(); + return contentParser; + } + + //----- Frame parser implementations -------------------------------------// + + private final FrameParser headerParser = new FrameParser() { + + @Override + public void parse(DataByteArrayInputStream data, int readSize) throws IOException { + int i = 0; + while (i++ < readSize) { + byte b = data.readByte(); + // skip repeating nulls + if (b == 0) { + continue; + } + + header = b; + + currentParser = initializeVariableLengthParser(); + currentParser.parse(data, readSize - 1); + return; } - case PUBCOMP.TYPE: { - return "PUBCOMP"; + } + + @Override + public void reset() throws IOException { + header = -1; + } + }; + + private final FrameParser contentParser = new FrameParser() { + + @Override + public void parse(DataByteArrayInputStream data, int readSize) throws IOException { + int i = 0; + while (i++ < readSize) { + currentCommand.write(data.readByte()); + payLoadRead++; + + if (payLoadRead == contentLength) { + processCommand(); + currentParser = initializeHeaderParser(); + currentParser.parse(data, readSize - i); + return; + } } - default: - return "UNKNOWN"; } - } + @Override + public void reset() throws IOException { + contentLength = -1; + payLoadRead = 0; + currentCommand.reset(); + } + }; + + private final FrameParser variableLengthParser = new FrameParser() { + + private byte digit; + private int multiplier = 1; + private int length; + + @Override + public void parse(DataByteArrayInputStream data, int readSize) throws IOException { + int i = 0; + while (i++ < readSize) { + digit = data.readByte(); + length += (digit & 0x7F) * multiplier; + multiplier <<= 7; + if ((digit & 0x80) == 0) { + if (length == 0) { + processCommand(); + currentParser = initializeHeaderParser(); + } else { + currentParser = initializeContentParser(); + contentLength = length; + } + currentParser.parse(data, readSize - i); + return; + } + } + } + @Override + public void reset() throws IOException { + digit = 0; + multiplier = 1; + length = 0; + } + }; } http://git-wip-us.apache.org/repos/asf/activemq/blob/7c04ead4/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java index ef18fe0..f982848 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java @@ -23,13 +23,14 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import javax.net.SocketFactory; + import org.apache.activemq.transport.nio.NIOSSLTransport; import org.apache.activemq.wireformat.WireFormat; import org.fusesource.hawtbuf.DataByteArrayInputStream; public class MQTTNIOSSLTransport extends NIOSSLTransport { - MQTTCodec codec; + private MQTTCodec codec; public MQTTNIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { super(wireFormat, socketFactory, remoteLocation, localLocation); @@ -55,5 +56,4 @@ public class MQTTNIOSSLTransport extends NIOSSLTransport { DataByteArrayInputStream dis = new DataByteArrayInputStream(fill); codec.parse(dis, fill.length); } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/7c04ead4/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java index 2def25c..a30ed50 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java @@ -16,6 +16,17 @@ */ package org.apache.activemq.transport.mqtt; +import org.fusesource.mqtt.codec.CONNECT; +import org.fusesource.mqtt.codec.DISCONNECT; +import org.fusesource.mqtt.codec.PINGREQ; +import org.fusesource.mqtt.codec.PUBACK; +import org.fusesource.mqtt.codec.PUBCOMP; +import org.fusesource.mqtt.codec.PUBLISH; +import org.fusesource.mqtt.codec.PUBREC; +import org.fusesource.mqtt.codec.PUBREL; +import org.fusesource.mqtt.codec.SUBSCRIBE; +import org.fusesource.mqtt.codec.UNSUBSCRIBE; + /** * A set of static methods useful for handling MQTT based client connections. */ @@ -70,4 +81,41 @@ public class MQTTProtocolSupport { public static String convertActiveMQToMQTT(String destinationName) { return destinationName.replace('.', '/'); } + + /** + * Given an MQTT header byte, determine the command type that the header + * represents. + * + * @param header + * the byte value for the MQTT frame header. + * + * @return a string value for the given command type. + */ + public static String commandType(byte header) { + byte messageType = (byte) ((header & 0xF0) >>> 4); + switch (messageType) { + case PINGREQ.TYPE: + return "PINGREQ"; + case CONNECT.TYPE: + return "CONNECT"; + case DISCONNECT.TYPE: + return "DISCONNECT"; + case SUBSCRIBE.TYPE: + return "SUBSCRIBE"; + case UNSUBSCRIBE.TYPE: + return "UNSUBSCRIBE"; + case PUBLISH.TYPE: + return "PUBLISH"; + case PUBACK.TYPE: + return "PUBACK"; + case PUBREC.TYPE: + return "PUBREC"; + case PUBREL.TYPE: + return "PUBREL"; + case PUBCOMP.TYPE: + return "PUBCOMP"; + default: + return "UNKNOWN"; + } + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/7c04ead4/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java index bbe4d1a..6e327bf 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java @@ -34,6 +34,14 @@ import org.apache.activemq.security.TempDestinationAuthorizationEntry; */ public class MQTTAuthTestSupport extends MQTTTestSupport { + public MQTTAuthTestSupport() { + super(); + } + + public MQTTAuthTestSupport(String connectorScheme, boolean useSSL) { + super(connectorScheme, useSSL); + } + @Override protected BrokerPlugin configureAuthentication() throws Exception { List users = new ArrayList(); http://git-wip-us.apache.org/repos/asf/activemq/blob/7c04ead4/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java index 3cb856f..574d554 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java @@ -66,11 +66,15 @@ public class MQTTAuthTests extends MQTTAuthTestSupport { return Arrays.asList(new Object[][] { {"mqtt", false}, {"mqtt+ssl", true}, - {"mqtt+nio", false} - // TODO - Fails {"mqtt+nio+ssl", true} + {"mqtt+nio", false}, + {"mqtt+nio+ssl", true} }); } + public MQTTAuthTests(String connectorScheme, boolean useSSL) { + super(connectorScheme, useSSL); + } + @Test(timeout = 60 * 1000) public void testAnonymousUserConnect() throws Exception { MQTT mqtt = createMQTTConnection(); http://git-wip-us.apache.org/repos/asf/activemq/blob/7c04ead4/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCodecTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCodecTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCodecTest.java new file mode 100644 index 0000000..31af1ab --- /dev/null +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCodecTest.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; + +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtbuf.DataByteArrayInputStream; +import org.fusesource.hawtbuf.DataByteArrayOutputStream; +import org.fusesource.hawtbuf.UTF8Buffer; +import org.fusesource.mqtt.codec.CONNECT; +import org.fusesource.mqtt.codec.MQTTFrame; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests the functionality of the MQTTCodec class. + */ +public class MQTTCodecTest { + + private static final Logger LOG = LoggerFactory.getLogger(MQTTCodecTest.class); + + private final MQTTWireFormat wireFormat = new MQTTWireFormat(); + + private List frames; + private MQTTCodec codec; + + @Before + public void setUp() throws Exception { + frames = new ArrayList(); + codec = new MQTTCodec(new MQTTCodec.MQTTFrameSink() { + + @Override + public void onFrame(MQTTFrame mqttFrame) { + frames.add(mqttFrame); + } + }); + } + + @Test + public void testEmptyConnectBytes() throws Exception { + + CONNECT connect = new CONNECT(); + connect.cleanSession(true); + connect.clientId(new UTF8Buffer("")); + + DataByteArrayOutputStream output = new DataByteArrayOutputStream(); + wireFormat.marshal(connect.encode(), output); + Buffer marshalled = output.toBuffer(); + + DataByteArrayInputStream input = new DataByteArrayInputStream(marshalled); + codec.parse(input, marshalled.length()); + + assertTrue(!frames.isEmpty()); + assertEquals(1, frames.size()); + + connect = new CONNECT().decode(frames.get(0)); + LOG.info("Unmarshalled: {}", connect); + assertTrue(connect.cleanSession()); + } + + @Test + public void testConnectWithCredentialsBackToBack() throws Exception { + + CONNECT connect = new CONNECT(); + connect.cleanSession(false); + connect.clientId(new UTF8Buffer("test")); + connect.userName(new UTF8Buffer("user")); + connect.password(new UTF8Buffer("pass")); + + DataByteArrayOutputStream output = new DataByteArrayOutputStream(); + wireFormat.marshal(connect.encode(), output); + wireFormat.marshal(connect.encode(), output); + Buffer marshalled = output.toBuffer(); + + DataByteArrayInputStream input = new DataByteArrayInputStream(marshalled); + codec.parse(input, marshalled.length()); + + assertTrue(!frames.isEmpty()); + assertEquals(2, frames.size()); + + for (MQTTFrame frame : frames) { + connect = new CONNECT().decode(frame); + LOG.info("Unmarshalled: {}", connect); + assertFalse(connect.cleanSession()); + assertEquals("user", connect.userName().toString()); + assertEquals("pass", connect.password().toString()); + assertEquals("test", connect.clientId().toString()); + } + } + + @Test + public void testProcessInChunks() throws Exception { + + CONNECT connect = new CONNECT(); + connect.cleanSession(false); + connect.clientId(new UTF8Buffer("test")); + connect.userName(new UTF8Buffer("user")); + connect.password(new UTF8Buffer("pass")); + + DataByteArrayOutputStream output = new DataByteArrayOutputStream(); + wireFormat.marshal(connect.encode(), output); + Buffer marshalled = output.toBuffer(); + + DataByteArrayInputStream input = new DataByteArrayInputStream(marshalled); + + int first = marshalled.length() / 2; + int second = marshalled.length() - first; + + codec.parse(input, first); + codec.parse(input, second); + + assertTrue(!frames.isEmpty()); + assertEquals(1, frames.size()); + + connect = new CONNECT().decode(frames.get(0)); + LOG.info("Unmarshalled: {}", connect); + assertFalse(connect.cleanSession()); + + assertEquals("user", connect.userName().toString()); + assertEquals("pass", connect.password().toString()); + assertEquals("test", connect.clientId().toString()); + } + + @Test + public void testProcessInBytes() throws Exception { + + CONNECT connect = new CONNECT(); + connect.cleanSession(false); + connect.clientId(new UTF8Buffer("test")); + connect.userName(new UTF8Buffer("user")); + connect.password(new UTF8Buffer("pass")); + + DataByteArrayOutputStream output = new DataByteArrayOutputStream(); + wireFormat.marshal(connect.encode(), output); + Buffer marshalled = output.toBuffer(); + + DataByteArrayInputStream input = new DataByteArrayInputStream(marshalled); + + int size = marshalled.length(); + + for (int i = 0; i < size; ++i) { + codec.parse(input, 1); + } + + assertTrue(!frames.isEmpty()); + assertEquals(1, frames.size()); + + connect = new CONNECT().decode(frames.get(0)); + LOG.info("Unmarshalled: {}", connect); + assertFalse(connect.cleanSession()); + + assertEquals("user", connect.userName().toString()); + assertEquals("pass", connect.password().toString()); + assertEquals("test", connect.clientId().toString()); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/7c04ead4/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTest.java new file mode 100644 index 0000000..b6dd9f9 --- /dev/null +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTest.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +/** + * Run the basic tests with the NIO Transport. + */ +public class MQTTNIOSSLTest extends MQTTTest { + + @Override + public String getProtocolScheme() { + return "mqtt+nio+ssl"; + } + + @Override + public boolean isUseSSL() { + return true; + } +}