Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java?rev=1393500&view=auto ============================================================================== --- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java (added) +++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java Wed Oct 3 14:15:01 2012 @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.command.Response; + +import java.io.IOException; + + +/** + * Interface used by the AMQPProtocolConverter for callbacks. + */ +interface ResponseHandler { + void onResponse(AmqpProtocolConverter converter, Response response) throws IOException; +} Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/package.html URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/package.html?rev=1393500&view=auto ============================================================================== --- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/package.html (added) +++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/package.html Wed Oct 3 14:15:01 2012 @@ -0,0 +1,25 @@ + + + + + + +A Broker side implementation of the AMQP 3.1 protocol - see http://amqp.org/ + + + Added: activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp?rev=1393500&view=auto ============================================================================== --- activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp (added) +++ activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp Wed Oct 3 14:15:01 2012 @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.amqp.AmqpTransportFactory Added: activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp%2Bnio?rev=1393500&view=auto ============================================================================== --- activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio (added) +++ activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio Wed Oct 3 14:15:01 2012 @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.amqp.AmqpNioTransportFactory Added: activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio+ssl URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp%2Bnio%2Bssl?rev=1393500&view=auto ============================================================================== --- activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio+ssl (added) +++ activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio+ssl Wed Oct 3 14:15:01 2012 @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.amqp.AmqpNioSslTransportFactory \ No newline at end of file Added: activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp%2Bssl?rev=1393500&view=auto ============================================================================== --- activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl (added) +++ activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl Wed Oct 3 14:15:01 2012 @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.amqp.AmqpSslTransportFactory Added: activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/amqp URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/amqp?rev=1393500&view=auto ============================================================================== --- activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/amqp (added) +++ activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/amqp Wed Oct 3 14:15:01 2012 @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.amqp.AmqpWireFormatFactory \ No newline at end of file Added: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java?rev=1393500&view=auto ============================================================================== --- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java (added) +++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java Wed Oct 3 14:15:01 2012 @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.broker.BrokerService; + +public class AmqpNioTest extends AmqpTest { + protected void addAMQPConnector(BrokerService brokerService) throws Exception { + brokerService.addConnector("amqp+nio://localhost:1883?maxInactivityDuration=-1"); + } + +} Added: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java?rev=1393500&view=auto ============================================================================== --- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java (added) +++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java Wed Oct 3 14:15:01 2012 @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.broker.BrokerService; +import org.junit.Ignore; + +import javax.net.ssl.KeyManager; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; + +@Ignore("hangs atm, needs investigation") +public class AmqpSslTest extends AmqpTest { + public void startBroker() throws Exception { + System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore"); + System.setProperty("javax.net.ssl.trustStorePassword", "password"); + System.setProperty("javax.net.ssl.trustStoreType", "jks"); + System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore"); + System.setProperty("javax.net.ssl.keyStorePassword", "password"); + System.setProperty("javax.net.ssl.keyStoreType", "jks"); + super.startBroker(); + } + + protected void addAMQPConnector(BrokerService brokerService) throws Exception { + brokerService.addConnector("amqp+ssl://localhost:8883"); + } + +// protected AMQP createAMQPConnection() throws Exception { +// AMQP amqp = new AMQP(); +// amqp.setHost("ssl://localhost:8883"); +// SSLContext ctx = SSLContext.getInstance("TLS"); +// ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom()); +// amqp.setSslContext(ctx); +// return amqp; +// } + + static class DefaultTrustManager implements X509TrustManager { + + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + } + + public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + } + + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + } + +} Added: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java?rev=1393500&view=auto ============================================================================== --- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java (added) +++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java Wed Oct 3 14:15:01 2012 @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.AutoFailTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.util.ByteSequence; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.fusesource.hawtbuf.UTF8Buffer.utf8; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + + +public class AmqpTest { + protected static final Logger LOG = LoggerFactory.getLogger(AmqpTest.class); + protected BrokerService brokerService; + protected Vector exceptions = new Vector(); + protected int numberOfMessages; + AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {}; + protected int port; + + @Before + public void startBroker() throws Exception { + autoFailTestSupport.startAutoFailThread(); + exceptions.clear(); + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setAdvisorySupport(false); + addAMQPConnector(); + brokerService.start(); + this.numberOfMessages = 2000; + } + + protected void addAMQPConnector() throws Exception { + final TransportConnector connector = brokerService.addConnector("amqp://localhost:0"); + port = connector.getConnectUri().getPort(); + } + + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + brokerService = null; + } + autoFailTestSupport.stopAutoFailThread(); + } + + + +// @Test +// public void testSendAndReceiveAMQP() throws Exception { +// addAMQPConnector(brokerService); +// brokerService.start(); +// AMQP amqp = new AMQP(); +// final BlockingConnection subscribeConnection = amqp.blockingConnection(); +// subscribeConnection.connect(); +// Topic topic = new Topic("foo/bah",QoS.AT_MOST_ONCE); +// Topic[] topics = {topic}; +// subscribeConnection.subscribe(topics); +// final CountDownLatch latch = new CountDownLatch(numberOfMessages); +// +// Thread thread = new Thread(new Runnable() { +// public void run() { +// for (int i = 0; i < numberOfMessages; i++){ +// try { +// Message message = subscribeConnection.receive(); +// message.ack(); +// latch.countDown(); +// } catch (Exception e) { +// e.printStackTrace(); +// break; +// } +// +// } +// } +// }); +// thread.start(); +// +// BlockingConnection publisherConnection = amqp.blockingConnection(); +// publisherConnection.connect(); +// for (int i = 0; i < numberOfMessages; i++){ +// String payload = "Message " + i; +// publisherConnection.publish(topic.name().toString(),payload.getBytes(),QoS.AT_LEAST_ONCE,false); +// } +// +// latch.await(10, TimeUnit.SECONDS); +// assertEquals(0, latch.getCount()); +// } +// +// @Test +// public void testSendAndReceiveAtMostOnce() throws Exception { +// addAMQPConnector(brokerService); +// brokerService.start(); +// AMQP amqp = createAMQPConnection(); +// amqp.setKeepAlive(Short.MAX_VALUE); +// BlockingConnection connection = amqp.blockingConnection(); +// +// connection.connect(); +// +// +// Topic[] topics = {new Topic(utf8("foo"), QoS.AT_MOST_ONCE)}; +// connection.subscribe(topics); +// for (int i = 0; i < numberOfMessages; i++) { +// String payload = "Test Message: " + i; +// connection.publish("foo", payload.getBytes(), QoS.AT_MOST_ONCE, false); +// Message message = connection.receive(); +// assertEquals(payload, new String(message.getPayload())); +// } +// connection.disconnect(); +// } +// +// @Test +// public void testSendAndReceiveAtLeastOnce() throws Exception { +// addAMQPConnector(brokerService); +// brokerService.start(); +// AMQP amqp = createAMQPConnection(); +// amqp.setKeepAlive(Short.MAX_VALUE); +// BlockingConnection connection = amqp.blockingConnection(); +// +// connection.connect(); +// +// Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)}; +// connection.subscribe(topics); +// for (int i = 0; i < numberOfMessages; i++) { +// String payload = "Test Message: " + i; +// connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false); +// Message message = connection.receive(); +// message.ack(); +// assertEquals(payload, new String(message.getPayload())); +// } +// connection.disconnect(); +// } +// +// @Test +// public void testSendAndReceiveExactlyOnce() throws Exception { +// addAMQPConnector(brokerService); +// brokerService.start(); +// AMQP publisher = createAMQPConnection(); +// BlockingConnection pubConnection = publisher.blockingConnection(); +// +// pubConnection.connect(); +// +// AMQP subscriber = createAMQPConnection(); +// BlockingConnection subConnection = subscriber.blockingConnection(); +// +// subConnection.connect(); +// +// Topic[] topics = {new Topic(utf8("foo"), QoS.EXACTLY_ONCE)}; +// subConnection.subscribe(topics); +// for (int i = 0; i < numberOfMessages; i++) { +// String payload = "Test Message: " + i; +// pubConnection.publish("foo", payload.getBytes(), QoS.EXACTLY_ONCE, false); +// Message message = subConnection.receive(); +// message.ack(); +// assertEquals(payload, new String(message.getPayload())); +// } +// subConnection.disconnect(); +// pubConnection.disconnect(); +// } +// +// @Test +// public void testSendAndReceiveLargeMessages() throws Exception { +// byte[] payload = new byte[1024 * 32]; +// for (int i = 0; i < payload.length; i++){ +// payload[i] = '2'; +// } +// addAMQPConnector(brokerService); +// brokerService.start(); +// +// AMQP publisher = createAMQPConnection(); +// BlockingConnection pubConnection = publisher.blockingConnection(); +// +// pubConnection.connect(); +// +// AMQP subscriber = createAMQPConnection(); +// BlockingConnection subConnection = subscriber.blockingConnection(); +// +// subConnection.connect(); +// +// Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)}; +// subConnection.subscribe(topics); +// for (int i = 0; i < 10; i++) { +// pubConnection.publish("foo", payload, QoS.AT_LEAST_ONCE, false); +// Message message = subConnection.receive(); +// message.ack(); +// assertArrayEquals(payload, message.getPayload()); +// } +// subConnection.disconnect(); +// pubConnection.disconnect(); +// } +// +// +// @Test +// public void testSendAMQPReceiveJMS() throws Exception { +// addAMQPConnector(brokerService); +// brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL); +// brokerService.start(); +// AMQP amqp = createAMQPConnection(); +// BlockingConnection connection = amqp.blockingConnection(); +// final String DESTINATION_NAME = "foo.*"; +// connection.connect(); +// +// ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection(); +// activeMQConnection.start(); +// Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); +// javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME); +// MessageConsumer consumer = s.createConsumer(jmsTopic); +// +// for (int i = 0; i < numberOfMessages; i++) { +// String payload = "Test Message: " + i; +// connection.publish("foo/bah", payload.getBytes(), QoS.AT_LEAST_ONCE, false); +// ActiveMQMessage message = (ActiveMQMessage) consumer.receive(); +// ByteSequence bs = message.getContent(); +// assertEquals(payload, new String(bs.data, bs.offset, bs.length)); +// } +// +// +// activeMQConnection.close(); +// connection.disconnect(); +// } +// +// @Test +// public void testSendJMSReceiveAMQP() throws Exception { +// addAMQPConnector(brokerService); +// brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL); +// brokerService.start(); +// AMQP amqp = createAMQPConnection(); +// amqp.setKeepAlive(Short.MAX_VALUE); +// BlockingConnection connection = amqp.blockingConnection(); +// connection.connect(); +// +// ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection(); +// activeMQConnection.start(); +// Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); +// javax.jms.Topic jmsTopic = s.createTopic("foo.far"); +// MessageProducer producer = s.createProducer(jmsTopic); +// +// Topic[] topics = {new Topic(utf8("foo/+"), QoS.AT_MOST_ONCE)}; +// connection.subscribe(topics); +// for (int i = 0; i < numberOfMessages; i++) { +// String payload = "This is Test Message: " + i; +// TextMessage sendMessage = s.createTextMessage(payload); +// producer.send(sendMessage); +// Message message = connection.receive(); +// message.ack(); +// assertEquals(payload, new String(message.getPayload())); +// } +// connection.disconnect(); +// } +// +// + + +} \ No newline at end of file Added: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java?rev=1393500&view=auto ============================================================================== --- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java (added) +++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java Wed Oct 3 14:15:01 2012 @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import com.swiftmq.amqp.AMQPContext; +import com.swiftmq.amqp.v100.client.*; +import com.swiftmq.amqp.v100.generated.messaging.message_format.AmqpValue; +import com.swiftmq.amqp.v100.messaging.AMQPMessage; +import com.swiftmq.amqp.v100.types.AMQPString; +import com.swiftmq.amqp.v100.types.AMQPType; +import org.junit.Test; + +/** + * @author Hiram Chirino + */ +public class SwiftMQClientTest extends AmqpTest { + + @Test + public void testSendReceive() throws Exception { + + String queue = "testqueue"; + int nMsgs = 1; + int qos = QoS.AT_MOST_ONCE; + AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT); + + try { + + Connection connection = new Connection(ctx, "127.0.0.1", port, false); + connection.setContainerId("client"); + connection.setIdleTimeout(-1); + connection.setMaxFrameSize(1024 * 4); + connection.setExceptionListener(new ExceptionListener() { + public void onException(Exception e) { + e.printStackTrace(); + } + }); + connection.connect(); + { + String data = String.format("%010d", 0); + + Session session = connection.createSession(10, 10); + Producer p = session.createProducer(queue, qos); + for (int i = 0; i < nMsgs; i++) { + AMQPMessage msg = new AMQPMessage(); + String s = "Message #" + (i + 1); + System.out.println("Sending " + s); + msg.setAmqpValue(new AmqpValue(new AMQPString(s + ", data: " + data))); + p.send(msg); + } + p.close(); + session.close(); + } +// { +// Session session = connection.createSession(10, 10); +// Consumer c = session.createConsumer(queue, 100, qos, true, null); +// +// // Receive messages non-transacted +// for (int i = 0; i < nMsgs; i++) { +// AMQPMessage msg = c.receive(); +// final AMQPType value = msg.getAmqpValue().getValue(); +// if (value instanceof AMQPString) { +// AMQPString s = (AMQPString) value; +// System.out.println("Received: " + s.getValue()); +// } +// if (!msg.isSettled()) +// msg.accept(); +// } +// c.close(); +// session.close(); +// } + connection.close(); + } catch (Exception e) { + e.printStackTrace(); + } + + } + +} Added: activemq/trunk/activemq-amqp/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/resources/log4j.properties?rev=1393500&view=auto ============================================================================== --- activemq/trunk/activemq-amqp/src/test/resources/log4j.properties (added) +++ activemq/trunk/activemq-amqp/src/test/resources/log4j.properties Wed Oct 3 14:15:01 2012 @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# The logging properties used during tests.. +# +log4j.rootLogger=WARN, console, file +log4j.logger.org.apache.activemq=INFO +log4j.logger.org.fusesource=INFO + +# Console will only display warnnings +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n +log4j.appender.console.threshold=TRACE + +# File appender will contain all info messages +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n +log4j.appender.file.file=target/test.log +log4j.appender.file.append=true Propchange: activemq/trunk/activemq-amqp/src/test/resources/log4j.properties ------------------------------------------------------------------------------ svn:executable = *