Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DFE1DD01B for ; Sat, 18 May 2013 16:39:56 +0000 (UTC) Received: (qmail 16447 invoked by uid 500); 18 May 2013 16:39:57 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 16420 invoked by uid 500); 18 May 2013 16:39:57 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 16412 invoked by uid 99); 18 May 2013 16:39:57 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 18 May 2013 16:39:57 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 18 May 2013 16:39:55 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id DDB7023889CB; Sat, 18 May 2013 16:39:35 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1484143 - in /activemq/trunk: activemq-unit-tests/pom.xml activemq-unit-tests/src/test/java/org/apache/activemq/conversions/ activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java pom.xml Date: Sat, 18 May 2013 16:39:35 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130518163935.DDB7023889CB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Sat May 18 16:39:35 2013 New Revision: 1484143 URL: http://svn.apache.org/r1484143 Log: Fixes AMQ-4544: Cant send MQTT message to AMQP endpoints Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java Modified: activemq/trunk/activemq-unit-tests/pom.xml activemq/trunk/pom.xml Modified: activemq/trunk/activemq-unit-tests/pom.xml URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/pom.xml?rev=1484143&r1=1484142&r2=1484143&view=diff ============================================================================== --- activemq/trunk/activemq-unit-tests/pom.xml (original) +++ activemq/trunk/activemq-unit-tests/pom.xml Sat May 18 16:39:35 2013 @@ -156,6 +156,12 @@ + org.apache.qpid + qpid-amqp-1-0-client-jms + ${qpid-jms-version} + test + + org.apache.xbean xbean-spring test Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java?rev=1484143&view=auto ============================================================================== --- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java (added) +++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java Sat May 18 16:39:35 2013 @@ -0,0 +1,104 @@ +package org.apache.activemq.conversions; + +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; +import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl; +import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.QoS; + +import javax.jms.*; +import java.io.UnsupportedEncodingException; +import java.util.Arrays; + +/** + */ +public class AmqpAndMqttTest extends CombinationTestSupport { + + protected BrokerService broker; + private TransportConnector amqpConnector; + private TransportConnector mqttConnector; + + @Override + protected void setUp() throws Exception { + super.setUp(); + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + } + + @Override + protected void tearDown() throws Exception { + if( broker!=null ) { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + super.tearDown(); + } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + amqpConnector = broker.addConnector("amqp://0.0.0.0:0"); + mqttConnector = broker.addConnector("mqtt://0.0.0.0:0"); + return broker; + } + + + public void testFromMqttToAmqp() throws Exception { + Connection amqp = createAmqpConnection(); + Session session = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createTopic("topic://FOO")); + + final BlockingConnection mqtt = createMQTTConnection().blockingConnection(); + mqtt.connect(); + byte[] payload = bytes("Hello World"); + mqtt.publish("FOO", payload, QoS.AT_LEAST_ONCE, false); + mqtt.disconnect(); + + Message msg = consumer.receive(1000 * 5); + assertNotNull(msg); + assertTrue(msg instanceof BytesMessage); + + BytesMessage bmsg = (BytesMessage) msg; + byte[] actual = new byte[(int) bmsg.getBodyLength()]; + bmsg.readBytes(actual); + assertTrue(Arrays.equals(actual, payload)); + amqp.close(); + } + + private byte[] bytes(String value) { + try { + return value.getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + + protected MQTT createMQTTConnection() throws Exception { + MQTT mqtt = new MQTT(); + mqtt.setConnectAttemptsMax(1); + mqtt.setReconnectAttemptsMax(0); + mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort()); + return mqtt; + } + + public Connection createAmqpConnection() throws Exception { + final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", amqpConnector.getConnectUri().getPort(), "admin", "password"); + final Connection connection = factory.createConnection(); + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + } + }); + connection.start(); + return connection; + } + +} Modified: activemq/trunk/pom.xml URL: http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=1484143&r1=1484142&r2=1484143&view=diff ============================================================================== --- activemq/trunk/pom.xml (original) +++ activemq/trunk/pom.xml Sat May 18 16:39:35 2013 @@ -65,7 +65,7 @@ 1.0 1.0.0 1.9 - 1.17-SNAPSHOT + 1.17 0.1.8 1.8.0.12 4.2.3 @@ -94,7 +94,7 @@ 10.9.1.0 4.3.1 1.1.2 - 0.3.0-fuse-2 + 0.3.0-fuse-3 0.20 1.3 1.0