Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0F3B817691 for ; Fri, 27 Mar 2015 21:31:53 +0000 (UTC) Received: (qmail 25782 invoked by uid 500); 27 Mar 2015 21:31:53 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 25737 invoked by uid 500); 27 Mar 2015 21:31:52 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 25718 invoked by uid 99); 27 Mar 2015 21:31:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Mar 2015 21:31:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CCB22E1818; Fri, 27 Mar 2015 21:31:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Message-Id: <5468e245ec964bb78a88e845f1f71920@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-5684 Date: Fri, 27 Mar 2015 21:31:52 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/master 05ff52dc1 -> f56ea45e5 https://issues.apache.org/jira/browse/AMQ-5684 Adds a new test case to use when investigating AmqpNetLite test failures with the 'JMS' transformer used on the AMQP TransportConnector. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f56ea45e Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f56ea45e Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f56ea45e Branch: refs/heads/master Commit: f56ea45e58a17fa3aad46cbe8fc605ef4ffdbc81 Parents: 05ff52d Author: Timothy Bish Authored: Fri Mar 27 17:31:46 2015 -0400 Committer: Timothy Bish Committed: Fri Mar 27 17:31:46 2015 -0400 ---------------------------------------------------------------------- .../transport/amqp/client/AmqpMessage.java | 166 +++++++++++++++++-- .../amqp/interop/AmqpSendReceiveTest.java | 81 +++++++++ 2 files changed, 234 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/f56ea45e/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index 52e5eaf..9db12f9 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -16,9 +16,16 @@ */ package org.apache.activemq.transport.amqp.client; +import java.util.HashMap; +import java.util.Map; + import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery; import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.message.Message; @@ -28,6 +35,9 @@ public class AmqpMessage { private final Message message; private final Delivery delivery; + private Map messageAnnotationsMap; + private Map applicationPropertiesMap; + /** * Creates a new AmqpMessage that wraps the information necessary to handle * an outgoing message. @@ -62,13 +72,51 @@ public class AmqpMessage { * @param delivery * the Delivery instance that produced this message. */ + @SuppressWarnings("unchecked") public AmqpMessage(AmqpReceiver receiver, Message message, Delivery delivery) { this.receiver = receiver; this.message = message; this.delivery = delivery; + + if (message.getMessageAnnotations() != null) { + messageAnnotationsMap = message.getMessageAnnotations().getValue(); + } + + if (message.getApplicationProperties() != null) { + applicationPropertiesMap = message.getApplicationProperties().getValue(); + } + } + + //----- Access to interal client resources -------------------------------// + + /** + * @return the AMQP Delivery object linked to a received message. + */ + public Delivery getWrappedDelivery() { + if (delivery != null) { + return new UnmodifiableDelivery(delivery); + } + + return null; } /** + * @return the AMQP Message that is wrapped by this object. + */ + public Message getWrappedMessage() { + return message; + } + + /** + * @return the AmqpReceiver that consumed this message. + */ + public AmqpReceiver getAmqpReceiver() { + return receiver; + } + + //----- Message disposition control --------------------------------------// + + /** * Accepts the message marking it as consumed on the remote peer. * * @throws Exception if an error occurs during the accept. @@ -134,29 +182,96 @@ public class AmqpMessage { receiver.release(delivery); } + //----- Convenience methods for constructing outbound messages -----------// + /** - * @return the AMQP Delivery object linked to a received message. + * Sets the MessageId property on an outbound message using the provided String + * + * @param messageId + * the String message ID value to set. */ - public Delivery getWrappedDelivery() { - if (delivery != null) { - return new UnmodifiableDelivery(delivery); + public void setMessageId(String messageId) { + checkReadOnly(); + lazyCreateProperties(); + getWrappedMessage().setMessageId(messageId); + } + + /** + * Return the set MessageId value in String form, if there are no properties + * in the given message return null. + * + * @return the set message ID in String form or null if not set. + */ + public String getMessageId() { + if (message.getProperties() == null) { + return null; } - return null; + return message.getProperties().getMessageId().toString(); } /** - * @return the AMQP Message that is wrapped by this object. + * Sets a given application property on an outbound message. + * + * @param key + * the name to assign the new property. + * @param value + * the value to set for the named property. */ - public Message getWrappedMessage() { - return message; + public void setApplicationProperty(String key, Object value) { + checkReadOnly(); + lazyCreateApplicationProperties(); + applicationPropertiesMap.put(key, value); } /** - * @return the AmqpReceiver that consumed this message. + * Gets the application property that is mapped to the given name or null + * if no property has been set with that name. + * + * @param key + * the name used to lookup the property in the application properties. + * + * @return the propety value or null if not set. */ - public AmqpReceiver getAmqpReceiver() { - return receiver; + public Object getApplicationProperty(String key) { + if (applicationPropertiesMap == null) { + return null; + } + + return applicationPropertiesMap.get(key); + } + + /** + * Perform a proper annotation set on the AMQP Message based on a Symbol key and + * the target value to append to the current annotations. + * + * @param key + * The name of the Symbol whose value is being set. + * @param value + * The new value to set in the annotations of this message. + */ + public void setMessageAnnotation(String key, Object value) { + checkReadOnly(); + lazyCreateMessageAnnotations(); + messageAnnotationsMap.put(Symbol.valueOf(key), value); + } + + /** + * Given a message annotation name, lookup and return the value associated with + * that annotation name. If the message annotations have not been created yet + * then this method will always return null. + * + * @param key + * the Symbol name that should be looked up in the message annotations. + * + * @return the value of the annotation if it exists, or null if not set or not accessible. + */ + public Object getMessageAnnotation(String key) { + if (messageAnnotationsMap == null) { + return null; + } + + return messageAnnotationsMap.get(Symbol.valueOf(key)); } /** @@ -169,11 +284,36 @@ public class AmqpMessage { * @throws IllegalStateException if the message is read only. */ public void setText(String value) throws IllegalStateException { + checkReadOnly(); + AmqpValue body = new AmqpValue(value); + getWrappedMessage().setBody(body); + } + + //----- Internal implementation ------------------------------------------// + + private void checkReadOnly() throws IllegalStateException { if (delivery != null) { throw new IllegalStateException("Message is read only."); } + } - AmqpValue body = new AmqpValue(value); - getWrappedMessage().setBody(body); + private void lazyCreateMessageAnnotations() { + if (messageAnnotationsMap == null) { + messageAnnotationsMap = new HashMap(); + message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap)); + } + } + + private void lazyCreateApplicationProperties() { + if (applicationPropertiesMap == null) { + applicationPropertiesMap = new HashMap(); + message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap)); + } + } + + private void lazyCreateProperties() { + if (message.getProperties() == null) { + message.setProperties(new Properties()); + } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/f56ea45e/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java new file mode 100644 index 0000000..b16ef59 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Test basic send and receive scenarios using only AMQP sender and receiver links. + */ +public class AmqpSendReceiveTest extends AmqpClientTestSupport { + + @Ignore("Test fails when JMS transformer is in play") + @Test(timeout = 60000) + public void testCloseBusyReceiver() throws Exception { + final int MSG_COUNT = 20; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + + for (int i = 0; i < MSG_COUNT; i++) { + AmqpMessage message = new AmqpMessage(); + + message.setMessageId("msg" + i); + message.setMessageAnnotation("serialNo", i); + message.setText("Test-Message"); + + sender.send(message); + } + + sender.close(); + + QueueViewMBean queue = getProxyToQueue(getTestName()); + assertEquals(20, queue.getQueueSize()); + + AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName()); + receiver1.flow(MSG_COUNT); + AmqpMessage received = receiver1.receive(5, TimeUnit.SECONDS); + assertEquals("msg0", received.getMessageId()); + receiver1.close(); + + AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName()); + receiver2.flow(200); + for (int i = 0; i < MSG_COUNT; ++i) { + received = receiver1.receive(5, TimeUnit.SECONDS); + assertEquals("msg" + i, received.getMessageId()); + } + + receiver2.close(); + connection.close(); + } +}