Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 640A1E090 for ; Mon, 4 Feb 2013 14:17:04 +0000 (UTC) Received: (qmail 52834 invoked by uid 500); 4 Feb 2013 14:17:04 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 52637 invoked by uid 500); 4 Feb 2013 14:17:00 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 52610 invoked by uid 99); 4 Feb 2013 14:16:59 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Feb 2013 14:16:59 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Feb 2013 14:16:57 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id BFFB9238896F; Mon, 4 Feb 2013 14:16:38 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1442128 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/jms/ client/src/test/java/org/apache/qpid/client/ common/src/main/java/org/apache/qpid/configuration/ systests/src/main/j... Date: Mon, 04 Feb 2013 14:16:38 -0000 To: commits@qpid.apache.org From: rgodfrey@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130204141638.BFFB9238896F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rgodfrey Date: Mon Feb 4 14:16:37 2013 New Revision: 1442128 URL: http://svn.apache.org/viewvc?rev=1442128&view=rev Log: QPID-4312 : [Java Client] add option for verification of queue existence during creation of a MessageProducer Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1442128&r1=1442127&r2=1442128&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Feb 4 14:16:37 2013 @@ -183,6 +183,9 @@ public class AMQConnection extends Close // new amqp-0-10 list encoded format. private boolean _useLegacyStreamMessageFormat; + // When sending to a Queue destination for the first time, check that the queue is bound + private final boolean _validateQueueOnSend; + //used to track the last failover time for //Address resolution purposes private volatile long _lastFailoverTime = 0; @@ -310,6 +313,18 @@ public class AMQConnection extends Close true : Boolean.getBoolean(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT); } + if(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND) != null) + { + _validateQueueOnSend = Boolean.parseBoolean( + connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND)); + } + else + { + _validateQueueOnSend = + Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false")); + } + + String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); if (_logger.isDebugEnabled()) { @@ -1441,7 +1456,7 @@ public class AMQConnection extends Close { return _delegate.getProtocolVersion(); } - + public String getBrokerUUID() { if(getProtocolVersion().equals(ProtocolVersion.v0_10)) @@ -1565,4 +1580,9 @@ public class AMQConnection extends Close { _delegate.setHeartbeatListener(listener); } + + public boolean validateQueueOnSend() + { + return _validateQueueOnSend; + } } Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1442128&r1=1442127&r2=1442128&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Mon Feb 4 14:16:37 2013 @@ -584,7 +584,7 @@ public class AMQSession_0_10 extends AMQ rk = routingKey.toString(); } - return isQueueBound(exchangeName.toString(),queueName.toString(),rk,null); + return isQueueBound(exchangeName == null ? null : exchangeName.toString(),queueName == null ? null : queueName.toString(),rk,null); } public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map args) @@ -1605,4 +1605,4 @@ public class AMQSession_0_10 extends AMQ } } } -} \ No newline at end of file +} Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1442128&r1=1442127&r2=1442128&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Mon Feb 4 14:16:37 2013 @@ -42,6 +42,8 @@ import org.slf4j.Logger; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { + + enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; private final Logger _logger ; @@ -291,7 +293,6 @@ public abstract class BasicMessageProduc checkPreConditions(); checkInitialDestination(); - synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); @@ -455,7 +456,7 @@ public abstract class BasicMessageProduc JMSException ex = new JMSException("Error validating destination"); ex.initCause(e); ex.setLinkedException(e); - + throw ex; } amqDestination.setExchangeExistsChecked(true); @@ -546,7 +547,7 @@ public abstract class BasicMessageProduc } } - private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException + private void checkPreConditions() throws JMSException { checkNotClosed(); @@ -560,15 +561,16 @@ public abstract class BasicMessageProduc } } - private void checkInitialDestination() + private void checkInitialDestination() throws JMSException { if (_destination == null) { throw new UnsupportedOperationException("Destination is null"); } + checkValidQueue(); } - private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException + private void checkDestination(Destination suppliedDestination) throws JMSException { if ((_destination != null) && (suppliedDestination != null)) { @@ -576,6 +578,11 @@ public abstract class BasicMessageProduc "This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); } + if(suppliedDestination instanceof AMQQueue) + { + AMQQueue destination = (AMQQueue) suppliedDestination; + checkValidQueue(destination); + } if (suppliedDestination == null) { throw new InvalidDestinationException("Supplied Destination was invalid"); @@ -583,6 +590,42 @@ public abstract class BasicMessageProduc } + void checkValidQueue() throws JMSException + { + if(_destination instanceof AMQQueue) + { + checkValidQueue(_destination); + } + } + void checkValidQueue(AMQDestination destination) throws JMSException + { + if (!destination.isCheckedForQueueBinding() && validateQueueOnSend()) + { + if (getSession().isStrictAMQP()) + { + getLogger().warn("AMQP does not support destination validation before publish, "); + destination.setCheckedForQueueBinding(true); + } + else + { + if (isBound(destination)) + { + destination.setCheckedForQueueBinding(true); + } + else + { + throw new InvalidDestinationException("Queue: " + destination.getName() + + " is not a valid destination (no bindings on server"); + } + } + } + } + + private boolean validateQueueOnSend() + { + return _connection.validateQueueOnSend(); + } + /** * The session used to create this producer */ Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=1442128&r1=1442127&r2=1442128&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java Mon Feb 4 14:16:37 2013 @@ -20,9 +20,8 @@ */ package org.apache.qpid.jms; -import org.apache.qpid.framing.AMQShortString; - import java.util.List; +import org.apache.qpid.framing.AMQShortString; /** Connection URL format @@ -35,7 +34,7 @@ public interface ConnectionURL public static final String AMQ_PROTOCOL = "amqp"; public static final String OPTIONS_SYNC_PERSISTENCE = "sync_persistence"; public static final String OPTIONS_MAXPREFETCH = "maxprefetch"; - public static final String OPTIONS_SYNC_ACK = "sync_ack"; + public static final String OPTIONS_SYNC_ACK = "sync_ack"; public static final String OPTIONS_SYNC_PUBLISH = "sync_publish"; public static final String OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT = "use_legacy_map_msg_format"; public static final String OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT = "use_legacy_stream_msg_format"; @@ -62,9 +61,11 @@ public interface ConnectionURL public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange"; public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange"; public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange"; + public static final String OPTIONS_VERIFY_QUEUE_ON_SEND = "verifyQueueOnSend"; + public static final byte URL_0_8 = 1; public static final byte URL_0_10 = 2; - + String getURL(); String getFailoverMethod(); Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1442128&r1=1442127&r2=1442128&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java (original) +++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Mon Feb 4 14:16:37 2013 @@ -670,7 +670,6 @@ public class AMQSession_0_10Test extends if (m instanceof ExchangeBound) { ExchangeBoundResult struc = new ExchangeBoundResult(); - struc.setQueueNotFound(true); result.setValue(struc); } else if (m instanceof ExchangeQuery) Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java?rev=1442128&r1=1442127&r2=1442128&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java Mon Feb 4 14:16:37 2013 @@ -203,6 +203,7 @@ public class ClientProperties * producer/consumer creation when using BindingURLs. */ public static final String QPID_DECLARE_EXCHANGES_PROP_NAME = "qpid.declare_exchanges"; + public static final String VERIFY_QUEUE_ON_SEND = "qpid.verify_queue_on_send"; private ClientProperties() Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=1442128&r1=1442127&r2=1442128&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Mon Feb 4 14:16:37 2013 @@ -1148,7 +1148,7 @@ public class AddressBasedDestinationTest MessageConsumer cons = ssn.createConsumer(ssn.createTopic("ADDR:amq.topic/test")); MessageProducer prod = ssn.createProducer(null); - Queue queue = ssn.createQueue("ADDR:amq.topic/test"); + Topic queue = ssn.createTopic("ADDR:amq.topic/test"); prod.send(queue,ssn.createTextMessage("A")); Message msg = cons.receive(1000); Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java?rev=1442128&r1=1442127&r2=1442128&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java Mon Feb 4 14:16:37 2013 @@ -21,16 +21,23 @@ package org.apache.qpid.test.unit.basic; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - +import java.util.Collections; +import java.util.Map; +import javax.jms.Connection; import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.test.utils.QpidBrokerTestCase; public class InvalidDestinationTest extends QpidBrokerTestCase { @@ -52,17 +59,22 @@ public class InvalidDestinationTest exte public void testInvalidDestination() throws Exception { - Queue invalidDestination = new AMQQueue("amq.direct","unknownQ"); - AMQQueue validDestination = new AMQQueue("amq.direct","knownQ"); + QueueSession queueSession = _connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + Queue invalidDestination = queueSession.createQueue("unknownQ"); + + Queue validDestination = queueSession.createQueue(getTestQueueName()); + // This is the only easy way to create and bind a queue from the API :-( queueSession.createConsumer(validDestination); + QueueSender sender; + TextMessage msg= queueSession.createTextMessage("Hello"); - QueueSender sender = queueSession.createSender(invalidDestination); - TextMessage msg = queueSession.createTextMessage("Hello"); try { + sender = queueSession.createSender(invalidDestination); + sender.send(msg); fail("Expected InvalidDestinationException"); } @@ -70,10 +82,8 @@ public class InvalidDestinationTest exte { // pass } - sender.close(); sender = queueSession.createSender(null); - invalidDestination = new AMQQueue("amq.direct","unknownQ"); try { @@ -86,7 +96,6 @@ public class InvalidDestinationTest exte } sender.send(validDestination,msg); sender.close(); - validDestination = new AMQQueue("amq.direct","knownQ"); sender = queueSession.createSender(validDestination); sender.send(msg); @@ -96,6 +105,71 @@ public class InvalidDestinationTest exte } + + public void testInvalidDestinationOnMessageProducer() throws Exception + { + setTestSystemProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "true"); + final AMQConnection connection = (AMQConnection) getConnection(); + doInvalidDestinationOnMessageProducer(connection); + + } + + + public void testInvalidDestinationOnMessageProducerURL() throws Exception + { + Map options = Collections.singletonMap(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND, "true"); + doInvalidDestinationOnMessageProducer(getConnectionWithOptions(options)); + + } + + private void doInvalidDestinationOnMessageProducer(Connection connection) throws JMSException + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue invalidDestination = session.createQueue("unknownQ"); + + Queue validDestination = session.createQueue("knownQ"); + + // This is the only easy way to create and bind a queue from the API :-( + session.createConsumer(validDestination); + + MessageProducer sender; + TextMessage msg = session.createTextMessage("Hello"); + try + { + sender = session.createProducer(invalidDestination); + sender.send(msg); + fail("Expected InvalidDestinationException"); + } + catch (InvalidDestinationException ex) + { + // pass + } + + + sender = session.createProducer(null); + invalidDestination = new AMQQueue("amq.direct","unknownQ"); + + try + { + sender.send(invalidDestination,msg); + fail("Expected InvalidDestinationException"); + } + catch (InvalidDestinationException ex) + { + // pass + } + sender.send(validDestination, msg); + sender.close(); + sender = session.createProducer(validDestination); + sender.send(msg); + + Topic topic = session.createTopic("randomTopic"); + sender = session.createProducer(topic); + sender.send(msg); + } + + public static junit.framework.Test suite() { Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java?rev=1442128&r1=1442127&r2=1442128&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java Mon Feb 4 14:16:37 2013 @@ -20,18 +20,15 @@ package org.apache.qpid.test.utils; import java.io.File; import java.io.FileOutputStream; import java.io.PrintStream; -import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; - import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; @@ -49,13 +46,13 @@ import javax.jms.Topic; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; - import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.exchange.ExchangeDefaults; @@ -1131,6 +1128,22 @@ public class QpidBrokerTestCase extends return getConnection(GUEST_USERNAME, GUEST_PASSWORD); } + public Connection getConnectionWithOptions(Map options) + throws URLSyntaxException, NamingException, JMSException + { + ConnectionURL curl = new AMQConnectionURL(getConnectionFactory().getConnectionURLString()); + for(Map.Entry entry : options.entrySet()) + { + curl.setOption(entry.getKey(), entry.getValue()); + } + curl = new AMQConnectionURL(curl.toString()); + + curl.setUsername(GUEST_USERNAME); + curl.setPassword(GUEST_PASSWORD); + return getConnection(curl); + } + + public Connection getConnection(ConnectionURL url) throws JMSException { _logger.info(url.getURL()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org