Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A7B14CA0E for ; Tue, 22 May 2012 15:23:18 +0000 (UTC) Received: (qmail 83479 invoked by uid 500); 22 May 2012 15:23:18 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 83409 invoked by uid 500); 22 May 2012 15:23:18 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 83399 invoked by uid 99); 22 May 2012 15:23:17 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 May 2012 15:23:17 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 May 2012 15:23:16 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 305C92388847 for ; Tue, 22 May 2012 15:22:56 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1341521 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java test/java/org/apache/activemq/transport/mqtt/MQTTTest.java Date: Tue, 22 May 2012 15:22:56 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120522152256.305C92388847@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Tue May 22 15:22:55 2012 New Revision: 1341521 URL: http://svn.apache.org/viewvc?rev=1341521&view=rev Log: Fix for https://issues.apache.org/jira/browse/AMQ-3855 Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?rev=1341521&r1=1341520&r2=1341521&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java Tue May 22 15:22:55 2012 @@ -261,8 +261,6 @@ class MQTTProtocolConverter { QoS onSubscribe(SUBSCRIBE command, Topic topic) throws MQTTProtocolException { ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString())); - - if (destination == null) { throw new MQTTProtocolException("Invalid Destination."); } @@ -458,31 +456,15 @@ class MQTTProtocolConverter { } result.topicName(topicName); - ByteSequence byteSequence = message.getContent(); - if (message.isCompressed()) { - Inflater inflater = new Inflater(); - inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length); - byte[] data = new byte[4096]; - int read; - ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - while ((read = inflater.inflate(data, 0, data.length)) != 0) { - bytesOut.write(data, 0, read); - } - byteSequence = bytesOut.toByteSequence(); - } if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { - if (byteSequence.getLength() > 4) { - byte[] content = new byte[byteSequence.getLength() - 4]; - System.arraycopy(byteSequence.data, 4, content, 0, content.length); - result.payload(new Buffer(content)); - } else { ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy(); + msg.setReadOnlyBody(true); String messageText = msg.getText(); if (messageText != null) { - result.payload(new Buffer(msg.getText().getBytes("UTF-8"))); + result.payload(new Buffer(messageText.getBytes("UTF-8"))); } - } + } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) { @@ -491,8 +473,29 @@ class MQTTProtocolConverter { byte[] data = new byte[(int) msg.getBodyLength()]; msg.readBytes(data); result.payload(new Buffer(data)); - } else { + } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE){ + ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); + msg.setReadOnlyBody(true); + Map map = msg.getContentMap(); + if (map != null){ + result.payload(new Buffer(map.toString().getBytes("UTF-8"))); + } + } + + else { + ByteSequence byteSequence = message.getContent(); if (byteSequence != null && byteSequence.getLength() > 0) { + if (message.isCompressed()){ + Inflater inflater = new Inflater(); + inflater.setInput(byteSequence.data,byteSequence.offset,byteSequence.length); + byte[] data = new byte[4096]; + int read; + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + while((read = inflater.inflate(data)) != 0){ + bytesOut.write(data,0,read); + } + byteSequence = bytesOut.toByteSequence(); + } result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length)); } } @@ -620,9 +623,9 @@ class MQTTProtocolConverter { } private String convertMQTTToActiveMQ(String name) { - String result = name.replace('>', '#'); - result = result.replace('*', '+'); - result = result.replace('.', '/'); + String result = name.replace('#', '>'); + result = result.replace('+', '*'); + result = result.replace('/', '.'); return result; } } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1341521&r1=1341520&r2=1341521&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java Tue May 22 15:22:55 2012 @@ -17,9 +17,13 @@ package org.apache.activemq.transport.mqtt; import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Session; +import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; @@ -63,6 +67,47 @@ public class MQTTTest { } @Test + public void testSendAndReceiveMQTT() throws Exception { + addMQTTConnector(brokerService); + brokerService.start(); + MQTT mqtt = new MQTT(); + final BlockingConnection subscribeConnection = mqtt.blockingConnection(); + subscribeConnection.connect(); + Topic topic = new Topic("foo/bah",QoS.AT_MOST_ONCE); + Topic[] topics = {topic}; + subscribeConnection.subscribe(topics); + final CountDownLatch latch = new CountDownLatch(numberOfMessages); + + Thread thread = new Thread(new Runnable() { + public void run() { + for (int i = 0; i < numberOfMessages; i++){ + try { + Message message = subscribeConnection.receive(); + message.ack(); + latch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + break; + } + + } + } + }); + thread.start(); + + BlockingConnection publisherConnection = mqtt.blockingConnection(); + publisherConnection.connect(); + for (int i = 0; i < numberOfMessages; i++){ + String payload = "Message " + i; + publisherConnection.publish(topic.name().toString(),payload.getBytes(),QoS.AT_LEAST_ONCE,false); + } + + latch.await(10, TimeUnit.SECONDS); + assertEquals(0, latch.getCount()); + + } + + @Test public void testSendAndReceiveAtMostOnce() throws Exception { addMQTTConnector(brokerService); brokerService.start(); @@ -172,7 +217,7 @@ public class MQTTTest { brokerService.start(); MQTT mqtt = createMQTTConnection(); BlockingConnection connection = mqtt.blockingConnection(); - final String DESTINATION_NAME = "foo"; + final String DESTINATION_NAME = "foo.*"; connection.connect(); ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection(); @@ -183,7 +228,7 @@ public class MQTTTest { for (int i = 0; i < numberOfMessages; i++) { String payload = "Test Message: " + i; - connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + connection.publish("foo/bah", payload.getBytes(), QoS.AT_LEAST_ONCE, false); ActiveMQMessage message = (ActiveMQMessage) consumer.receive(); ByteSequence bs = message.getContent(); assertEquals(payload, new String(bs.data, bs.offset, bs.length)); @@ -194,6 +239,36 @@ public class MQTTTest { connection.disconnect(); } + @Test + public void testSendJMSReceiveMQTT() throws Exception { + addMQTTConnector(brokerService); + brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL); + brokerService.start(); + MQTT mqtt = createMQTTConnection(); + mqtt.setKeepAlive(Short.MAX_VALUE); + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection(); + activeMQConnection.start(); + Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Topic jmsTopic = s.createTopic("foo.far"); + MessageProducer producer = s.createProducer(jmsTopic); + + Topic[] topics = {new Topic(utf8("foo/far"), QoS.AT_MOST_ONCE)}; + connection.subscribe(topics); + for (int i = 0; i < numberOfMessages; i++) { + String payload = "This is Test Message: " + i; + TextMessage sendMessage = s.createTextMessage(payload); + producer.send(sendMessage); + Message message = connection.receive(); + message.ack(); + assertEquals(payload, new String(message.getPayload())); + } + connection.disconnect(); + } + + protected void addMQTTConnector(BrokerService brokerService) throws Exception { brokerService.addConnector("mqtt://localhost:1883"); }